# Renaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7cd638fe Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7cd638fe Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7cd638fe Branch: refs/heads/master Commit: 7cd638fe62ddf8518c8e9a41f2b54ef22cf59dab Parents: b7479ed Author: sboikov <sboi...@gridgain.com> Authored: Fri Dec 5 11:24:58 2014 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Dec 5 11:24:58 2014 +0300 ---------------------------------------------------------------------- .../examples/ggfs/GgfsMapReduceExample.java | 6 +- .../main/java/org/apache/ignite/IgniteFs.java | 12 +- .../grid/ggfs/IgniteFsConfiguration.java | 3 +- .../mapreduce/GridGgfsRangeInputStream.java | 189 ------------------- .../ggfs/mapreduce/GridGgfsRecordResolver.java | 50 ----- .../grid/ggfs/mapreduce/GridGgfsTask.java | 165 ---------------- .../grid/ggfs/mapreduce/GridGgfsTaskArgs.java | 6 +- .../mapreduce/GridGgfsTaskNoReduceAdapter.java | 4 +- .../IgniteFsInputStreamJobAdapter.java | 4 +- .../grid/ggfs/mapreduce/IgniteFsJob.java | 8 +- .../mapreduce/IgniteFsRangeInputStream.java | 189 +++++++++++++++++++ .../ggfs/mapreduce/IgniteFsRecordResolver.java | 50 +++++ .../grid/ggfs/mapreduce/IgniteFsTask.java | 165 ++++++++++++++++ .../GridGgfsByteDelimiterRecordResolver.java | 2 +- .../GridGgfsFixedLengthRecordResolver.java | 2 +- .../processors/ggfs/GridGgfsAsyncImpl.java | 12 +- .../kernal/processors/ggfs/GridGgfsImpl.java | 26 +-- .../kernal/processors/ggfs/GridGgfsJobImpl.java | 4 +- .../processors/ggfs/GridGgfsProcessor.java | 2 +- .../ggfs/GridGgfsProcessorAdapter.java | 2 +- .../processors/ggfs/GridGgfsTaskArgsImpl.java | 8 +- .../processors/ggfs/GridNoopGgfsProcessor.java | 2 +- .../processors/ggfs/GridGgfsTaskSelfTest.java | 4 +- ...idHadoopDefaultMapReducePlannerSelfTest.java | 12 +- 24 files changed, 463 insertions(+), 464 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java b/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java index 85f103d..edd3f70 100644 --- a/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java +++ b/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java @@ -20,7 +20,7 @@ import java.io.*; import java.util.*; /** - * Example that shows how to use {@link GridGgfsTask} to find lines matching particular pattern in the file in pretty + * Example that shows how to use {@link org.gridgain.grid.ggfs.mapreduce.IgniteFsTask} to find lines matching particular pattern in the file in pretty * the same way as {@code grep} command does. * <p> * Remote nodes should always be started with configuration file which includes @@ -119,7 +119,7 @@ public class GgfsMapReduceExample { /** * Grep task. */ - private static class GrepTask extends GridGgfsTask<String, Collection<Line>> { + private static class GrepTask extends IgniteFsTask<String, Collection<Line>> { /** {@inheritDoc} */ @Override public IgniteFsJob createJob(IgniteFsPath path, IgniteFsFileRange range, GridGgfsTaskArgs<String> args) throws GridException { @@ -166,7 +166,7 @@ public class GgfsMapReduceExample { } /** {@inheritDoc} */ - @Override public Object execute(IgniteFs ggfs, GridGgfsRangeInputStream in) throws GridException, IOException { + @Override public Object execute(IgniteFs ggfs, IgniteFsRangeInputStream in) throws GridException, IOException { Collection<Line> res = null; long start = in.startOffset(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/apache/ignite/IgniteFs.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteFs.java b/modules/core/src/main/java/org/apache/ignite/IgniteFs.java index 241ab3e..fd3c88c 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteFs.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteFs.java @@ -269,7 +269,7 @@ public interface IgniteFs extends IgniteFsFileSystem, IgniteAsyncSupport { * @return Task result. * @throws GridException If execution failed. */ - public <T, R> R execute(GridGgfsTask<T, R> task, @Nullable GridGgfsRecordResolver rslvr, + public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) throws GridException; /** @@ -289,7 +289,7 @@ public interface IgniteFs extends IgniteFsFileSystem, IgniteAsyncSupport { * @return Task result. * @throws GridException If execution failed. */ - public <T, R> R execute(GridGgfsTask<T, R> task, @Nullable GridGgfsRecordResolver rslvr, + public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) throws GridException; @@ -305,8 +305,8 @@ public interface IgniteFs extends IgniteFsFileSystem, IgniteAsyncSupport { * @return Task result. * @throws GridException If execution failed. */ - public <T, R> R execute(Class<? extends GridGgfsTask<T, R>> taskCls, - @Nullable GridGgfsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) throws GridException; + public <T, R> R execute(Class<? extends IgniteFsTask<T, R>> taskCls, + @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) throws GridException; /** * Executes GGFS task with overridden maximum range length (see @@ -324,8 +324,8 @@ public interface IgniteFs extends IgniteFsFileSystem, IgniteAsyncSupport { * @return Task result. * @throws GridException If execution failed. */ - public <T, R> R execute(Class<? extends GridGgfsTask<T, R>> taskCls, - @Nullable GridGgfsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, + public <T, R> R execute(Class<? extends IgniteFsTask<T, R>> taskCls, + @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) throws GridException; /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/IgniteFsConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/IgniteFsConfiguration.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/IgniteFsConfiguration.java index 287e93c..45a009a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/IgniteFsConfiguration.java +++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/IgniteFsConfiguration.java @@ -9,7 +9,6 @@ package org.gridgain.grid.ggfs; -import org.gridgain.grid.ggfs.mapreduce.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -762,7 +761,7 @@ public class IgniteFsConfiguration { * length is smaller than file block size. In this case maximum task range size will be overridden and set to file * block size. * <p> - * Note that this parameter is applied when task is split into jobs before {@link GridGgfsRecordResolver} is + * Note that this parameter is applied when task is split into jobs before {@link org.gridgain.grid.ggfs.mapreduce.IgniteFsRecordResolver} is * applied. Therefore, final file ranges being assigned to particular jobs could be greater than value of this * parameter depending on file data layout and selected resolver type. * <p> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsRangeInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsRangeInputStream.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsRangeInputStream.java deleted file mode 100644 index ac96c68..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsRangeInputStream.java +++ /dev/null @@ -1,189 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.ggfs.mapreduce; - -import org.gridgain.grid.ggfs.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * Decorator for regular {@link org.gridgain.grid.ggfs.IgniteFsInputStream} which streams only data within the given range. - * This stream is used for {@link IgniteFsInputStreamJobAdapter} convenience adapter to create - * jobs which will be working only with the assigned range. You can also use it explicitly when - * working with {@link IgniteFsJob} directly. - */ -public final class GridGgfsRangeInputStream extends IgniteFsInputStream { - /** Base input stream. */ - private final IgniteFsInputStream is; - - /** Start position. */ - private final long start; - - /** Maximum stream length. */ - private final long maxLen; - - /** Current position within the stream. */ - private long pos; - - /** - * Constructor. - * - * @param is Base input stream. - * @param start Start position. - * @param maxLen Maximum stream length. - * @throws IOException In case of exception. - */ - public GridGgfsRangeInputStream(IgniteFsInputStream is, long start, long maxLen) throws IOException { - if (is == null) - throw new IllegalArgumentException("Input stream cannot be null."); - - if (start < 0) - throw new IllegalArgumentException("Start position cannot be negative."); - - if (start >= is.length()) - throw new IllegalArgumentException("Start position cannot be greater that file length."); - - if (maxLen < 0) - throw new IllegalArgumentException("Length cannot be negative."); - - if (start + maxLen > is.length()) - throw new IllegalArgumentException("Sum of start position and length cannot be greater than file length."); - - this.is = is; - this.start = start; - this.maxLen = maxLen; - - is.seek(start); - } - - /** {@inheritDoc} */ - @Override public long length() { - return is.length(); - } - - /** - * Constructor. - * - * @param is Base input stream. - * @param range File range. - * @throws IOException In case of exception. - */ - public GridGgfsRangeInputStream(IgniteFsInputStream is, IgniteFsFileRange range) throws IOException { - this(is, range.start(), range.length()); - } - - /** {@inheritDoc} */ - @Override public int read() throws IOException { - if (pos < maxLen) { - int res = is.read(); - - if (res != -1) - pos++; - - return res; - } - else - return -1; - } - - /** {@inheritDoc} */ - @Override public int read(@NotNull byte[] b, int off, int len) throws IOException { - if (pos < maxLen) { - len = (int)Math.min(len, maxLen - pos); - - int res = is.read(b, off, len); - - if (res != -1) - pos += res; - - return res; - } - else - return -1; - } - - /** {@inheritDoc} */ - @Override public int read(long pos, byte[] buf, int off, int len) throws IOException { - seek(pos); - - return read(buf, off, len); - } - - /** {@inheritDoc} */ - @Override public void readFully(long pos, byte[] buf) throws IOException { - readFully(pos, buf, 0, buf.length); - } - - /** {@inheritDoc} */ - @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException { - seek(pos); - - for (int readBytes = 0; readBytes < len;) { - int read = read(buf, off + readBytes, len - readBytes); - - if (read == -1) - throw new EOFException("Failed to read stream fully (stream ends unexpectedly) [pos=" + pos + - ", buf.length=" + buf.length + ", off=" + off + ", len=" + len + ']'); - - readBytes += read; - } - } - - /** {@inheritDoc} */ - @Override public void seek(long pos) throws IOException { - if (pos < 0) - throw new IOException("Seek position cannot be negative: " + pos); - - is.seek(start + pos); - - this.pos = pos; - } - - /** {@inheritDoc} */ - @Override public long position() { - return pos; - } - - /** - * Since range input stream represents a part of larger file stream, there is an offset at which this - * range input stream starts in original input stream. This method returns start offset of this input - * stream relative to original input stream. - * - * @return Start offset in original input stream. - */ - public long startOffset() { - return start; - } - - /** {@inheritDoc} */ - @Override public int available() { - long l = maxLen - pos; - - if (l < 0) - return 0; - - if (l > Integer.MAX_VALUE) - return Integer.MAX_VALUE; - - return (int)l; - } - - /** {@inheritDoc} */ - @Override public void close() throws IOException { - is.close(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridGgfsRangeInputStream.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsRecordResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsRecordResolver.java deleted file mode 100644 index e128776..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsRecordResolver.java +++ /dev/null @@ -1,50 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.ggfs.mapreduce; - -import org.apache.ignite.*; -import org.gridgain.grid.*; -import org.gridgain.grid.ggfs.*; -import org.gridgain.grid.ggfs.mapreduce.records.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * GGFS record resolver. When {@link GridGgfsTask} is split into {@link IgniteFsJob}s each produced job will obtain - * {@link IgniteFsFileRange} based on file data location. Record resolver is invoked in each job before actual - * execution in order to adjust record boundaries in a way consistent with user data. - * <p> - * E.g., you may want to split your task into jobs so that each job process zero, one or several lines from that file. - * But file is split into ranges based on block locations, not new line boundaries. Using convenient record resolver - * you can adjust job range so that it covers the whole line(s). - * <p> - * The following record resolvers are available out of the box: - * <ul> - * <li>{@link GridGgfsFixedLengthRecordResolver}</li> - * <li>{@link GridGgfsByteDelimiterRecordResolver}</li> - * <li>{@link GridGgfsStringDelimiterRecordResolver}</li> - * <li>{@link GridGgfsNewLineRecordResolver}</li> - * </ul> - */ -public interface GridGgfsRecordResolver extends Serializable { - /** - * Adjusts record start offset and length. - * - * @param ggfs GGFS instance to use. - * @param stream Input stream for split file. - * @param suggestedRecord Suggested file system record. - * @return New adjusted record. If this method returns {@code null}, original record is ignored. - * @throws GridException If resolve failed. - * @throws IOException If resolve failed. - */ - @Nullable public IgniteFsFileRange resolveRecords(IgniteFs ggfs, IgniteFsInputStream stream, - IgniteFsFileRange suggestedRecord) throws GridException, IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTask.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTask.java deleted file mode 100644 index 35e16de..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTask.java +++ /dev/null @@ -1,165 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.ggfs.mapreduce; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.resources.*; -import org.gridgain.grid.*; -import org.gridgain.grid.ggfs.*; -import org.gridgain.grid.kernal.*; -import org.gridgain.grid.kernal.processors.ggfs.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * GGFS task which can be executed on the grid using one of {@code GridGgfs.execute()} methods. Essentially GGFS task - * is regular {@link org.apache.ignite.compute.ComputeTask} with different map logic. Instead of implementing - * {@link org.apache.ignite.compute.ComputeTask#map(List, Object)} method to split task into jobs, you must implement - * {@link GridGgfsTask#createJob(org.gridgain.grid.ggfs.IgniteFsPath, IgniteFsFileRange, GridGgfsTaskArgs)} method. - * <p> - * Each file participating in GGFS task is split into {@link IgniteFsFileRange}s first. Normally range is a number of - * consequent bytes located on a single node (see {@code GridGgfsGroupDataBlocksKeyMapper}). In case maximum range size - * is provided (either through {@link org.gridgain.grid.ggfs.IgniteFsConfiguration#getMaximumTaskRangeLength()} or {@code GridGgfs.execute()} - * argument), then ranges could be further divided into smaller chunks. - * <p> - * Once file is split into ranges, each range is passed to {@code GridGgfsTask.createJob()} method in order to create a - * {@link IgniteFsJob}. - * <p> - * Finally all generated jobs are sent to Grid nodes for execution. - * <p> - * As with regular {@code GridComputeTask} you can define your own logic for results handling and reduce step. - * <p> - * Here is an example of such a task: - * <pre name="code" class="java"> - * public class WordCountTask extends GridGgfsTask<String, Integer> { - * @Override - * public GridGgfsJob createJob(GridGgfsPath path, GridGgfsFileRange range, GridGgfsTaskArgs<T> args) throws GridException { - * // New job will be created for each range within each file. - * // We pass user-provided argument (which is essentially a word to look for) to that job. - * return new WordCountJob(args.userArgument()); - * } - * - * // Aggregate results into one compound result. - * public Integer reduce(List<GridComputeJobResult> results) throws GridException { - * Integer total = 0; - * - * for (GridComputeJobResult res : results) { - * Integer cnt = res.getData(); - * - * // Null can be returned for non-existent file in case we decide to ignore such situations. - * if (cnt != null) - * total += cnt; - * } - * - * return total; - * } - * } - * </pre> - */ -public abstract class GridGgfsTask<T, R> extends ComputeTaskAdapter<GridGgfsTaskArgs<T>, R> { - /** */ - private static final long serialVersionUID = 0L; - - /** Injected grid. */ - @IgniteInstanceResource - private Ignite ignite; - - /** {@inheritDoc} */ - @Nullable @Override public final Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, - @Nullable GridGgfsTaskArgs<T> args) throws GridException { - assert ignite != null; - assert args != null; - - IgniteFs ggfs = ignite.fileSystem(args.ggfsName()); - GridGgfsProcessorAdapter ggfsProc = ((GridKernal) ignite).context().ggfs(); - - Map<ComputeJob, ClusterNode> splitMap = new HashMap<>(); - - Map<UUID, ClusterNode> nodes = mapSubgrid(subgrid); - - for (IgniteFsPath path : args.paths()) { - IgniteFsFile file = ggfs.info(path); - - if (file == null) { - if (args.skipNonExistentFiles()) - continue; - else - throw new GridException("Failed to process GGFS file because it doesn't exist: " + path); - } - - Collection<IgniteFsBlockLocation> aff = ggfs.affinity(path, 0, file.length(), args.maxRangeLength()); - - long totalLen = 0; - - for (IgniteFsBlockLocation loc : aff) { - ClusterNode node = null; - - for (UUID nodeId : loc.nodeIds()) { - node = nodes.get(nodeId); - - if (node != null) - break; - } - - if (node == null) - throw new GridException("Failed to find any of block affinity nodes in subgrid [loc=" + loc + - ", subgrid=" + subgrid + ']'); - - IgniteFsJob job = createJob(path, new IgniteFsFileRange(file.path(), loc.start(), loc.length()), args); - - if (job != null) { - ComputeJob jobImpl = ggfsProc.createJob(job, ggfs.name(), file.path(), loc.start(), - loc.length(), args.recordResolver()); - - splitMap.put(jobImpl, node); - } - - totalLen += loc.length(); - } - - assert totalLen == file.length(); - } - - return splitMap; - } - - /** - * Callback invoked during task map procedure to create job that will process specified split - * for GGFS file. - * - * @param path Path. - * @param range File range based on consecutive blocks. This range will be further - * realigned to record boundaries on destination node. - * @param args Task argument. - * @return GGFS job. If {@code null} is returned, the passed in file range will be skipped. - * @throws GridException If job creation failed. - */ - @Nullable public abstract IgniteFsJob createJob(IgniteFsPath path, IgniteFsFileRange range, - GridGgfsTaskArgs<T> args) throws GridException; - - /** - * Maps list by node ID. - * - * @param subgrid Subgrid. - * @return Map. - */ - private Map<UUID, ClusterNode> mapSubgrid(Collection<ClusterNode> subgrid) { - Map<UUID, ClusterNode> res = U.newHashMap(subgrid.size()); - - for (ClusterNode node : subgrid) - res.put(node.id(), node); - - return res; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskArgs.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskArgs.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskArgs.java index fd36ffe..caa0b44 100644 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskArgs.java +++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskArgs.java @@ -16,13 +16,13 @@ import java.util.*; /** * GGFS task arguments. When you initiate new GGFS task execution using one of {@code GridGgfs.execute(...)} methods, * all passed parameters are encapsulated in a single {@code GridGgfsTaskArgs} object. Later on this object is - * passed to {@link GridGgfsTask#createJob(org.gridgain.grid.ggfs.IgniteFsPath, IgniteFsFileRange, GridGgfsTaskArgs)} method. + * passed to {@link IgniteFsTask#createJob(org.gridgain.grid.ggfs.IgniteFsPath, IgniteFsFileRange, GridGgfsTaskArgs)} method. * <p> * Task arguments encapsulates the following data: * <ul> * <li>GGFS name</li> * <li>File paths passed to {@code GridGgfs.execute()} method</li> - * <li>{@link GridGgfsRecordResolver} for that task</li> + * <li>{@link IgniteFsRecordResolver} for that task</li> * <li>Flag indicating whether to skip non-existent file paths or throw an exception</li> * <li>User-defined task argument</li> * <li>Maximum file range length for that task (see {@link org.gridgain.grid.ggfs.IgniteFsConfiguration#getMaximumTaskRangeLength()})</li> @@ -48,7 +48,7 @@ public interface GridGgfsTaskArgs<T> { * * @return Record resolver. */ - public GridGgfsRecordResolver recordResolver(); + public IgniteFsRecordResolver recordResolver(); /** * Flag indicating whether to fail or simply skip non-existent files. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskNoReduceAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskNoReduceAdapter.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskNoReduceAdapter.java index 9131b09..802b7a5 100644 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskNoReduceAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/GridGgfsTaskNoReduceAdapter.java @@ -14,10 +14,10 @@ import org.apache.ignite.compute.*; import java.util.*; /** - * Convenient {@link GridGgfsTask} adapter with empty reduce step. Use this adapter in case you are not interested in + * Convenient {@link IgniteFsTask} adapter with empty reduce step. Use this adapter in case you are not interested in * results returned by jobs. */ -public abstract class GridGgfsTaskNoReduceAdapter<T, R> extends GridGgfsTask<T, R> { +public abstract class GridGgfsTaskNoReduceAdapter<T, R> extends IgniteFsTask<T, R> { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsInputStreamJobAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsInputStreamJobAdapter.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsInputStreamJobAdapter.java index ddc04b3..28c0890 100644 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsInputStreamJobAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsInputStreamJobAdapter.java @@ -29,7 +29,7 @@ public abstract class IgniteFsInputStreamJobAdapter extends IgniteFsJobAdapter { throws GridException, IOException { in.seek(range.start()); - return execute(ggfs, new GridGgfsRangeInputStream(in, range)); + return execute(ggfs, new IgniteFsRangeInputStream(in, range)); } /** @@ -41,5 +41,5 @@ public abstract class IgniteFsInputStreamJobAdapter extends IgniteFsJobAdapter { * @throws GridException If execution failed. * @throws IOException If IO exception encountered while working with stream. */ - public abstract Object execute(IgniteFs ggfs, GridGgfsRangeInputStream in) throws GridException, IOException; + public abstract Object execute(IgniteFs ggfs, IgniteFsRangeInputStream in) throws GridException, IOException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJob.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJob.java index 4aa33ec..cfd69a9 100644 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJob.java +++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJob.java @@ -16,8 +16,8 @@ import org.gridgain.grid.ggfs.*; import java.io.*; /** - * Defines executable unit for {@link GridGgfsTask}. Before this job is executed, it is assigned one of the - * ranges provided by the {@link GridGgfsRecordResolver} passed to one of the {@code GridGgfs.execute(...)} methods. + * Defines executable unit for {@link IgniteFsTask}. Before this job is executed, it is assigned one of the + * ranges provided by the {@link IgniteFsRecordResolver} passed to one of the {@code GridGgfs.execute(...)} methods. * <p> * {@link #execute(org.apache.ignite.IgniteFs, IgniteFsFileRange, org.gridgain.grid.ggfs.IgniteFsInputStream)} method is given {@link IgniteFsFileRange} this * job is expected to operate on, and already opened {@link org.gridgain.grid.ggfs.IgniteFsInputStream} for the file this range belongs to. @@ -28,8 +28,8 @@ import java.io.*; * <p> * In majority of the cases, when you want to process only provided range, you should explicitly control amount * of returned data and stop at range end. You can also use {@link IgniteFsInputStreamJobAdapter}, which operates - * on {@link GridGgfsRangeInputStream} bounded to range start and end, or manually wrap provided input stream with - * {@link GridGgfsRangeInputStream}. + * on {@link IgniteFsRangeInputStream} bounded to range start and end, or manually wrap provided input stream with + * {@link IgniteFsRangeInputStream}. * <p> * You can inject any resources in concrete implementation, just as with regular {@link org.apache.ignite.compute.ComputeJob} implementations. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRangeInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRangeInputStream.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRangeInputStream.java new file mode 100644 index 0000000..8687798 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRangeInputStream.java @@ -0,0 +1,189 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.ggfs.mapreduce; + +import org.gridgain.grid.ggfs.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Decorator for regular {@link org.gridgain.grid.ggfs.IgniteFsInputStream} which streams only data within the given range. + * This stream is used for {@link IgniteFsInputStreamJobAdapter} convenience adapter to create + * jobs which will be working only with the assigned range. You can also use it explicitly when + * working with {@link IgniteFsJob} directly. + */ +public final class IgniteFsRangeInputStream extends IgniteFsInputStream { + /** Base input stream. */ + private final IgniteFsInputStream is; + + /** Start position. */ + private final long start; + + /** Maximum stream length. */ + private final long maxLen; + + /** Current position within the stream. */ + private long pos; + + /** + * Constructor. + * + * @param is Base input stream. + * @param start Start position. + * @param maxLen Maximum stream length. + * @throws IOException In case of exception. + */ + public IgniteFsRangeInputStream(IgniteFsInputStream is, long start, long maxLen) throws IOException { + if (is == null) + throw new IllegalArgumentException("Input stream cannot be null."); + + if (start < 0) + throw new IllegalArgumentException("Start position cannot be negative."); + + if (start >= is.length()) + throw new IllegalArgumentException("Start position cannot be greater that file length."); + + if (maxLen < 0) + throw new IllegalArgumentException("Length cannot be negative."); + + if (start + maxLen > is.length()) + throw new IllegalArgumentException("Sum of start position and length cannot be greater than file length."); + + this.is = is; + this.start = start; + this.maxLen = maxLen; + + is.seek(start); + } + + /** {@inheritDoc} */ + @Override public long length() { + return is.length(); + } + + /** + * Constructor. + * + * @param is Base input stream. + * @param range File range. + * @throws IOException In case of exception. + */ + public IgniteFsRangeInputStream(IgniteFsInputStream is, IgniteFsFileRange range) throws IOException { + this(is, range.start(), range.length()); + } + + /** {@inheritDoc} */ + @Override public int read() throws IOException { + if (pos < maxLen) { + int res = is.read(); + + if (res != -1) + pos++; + + return res; + } + else + return -1; + } + + /** {@inheritDoc} */ + @Override public int read(@NotNull byte[] b, int off, int len) throws IOException { + if (pos < maxLen) { + len = (int)Math.min(len, maxLen - pos); + + int res = is.read(b, off, len); + + if (res != -1) + pos += res; + + return res; + } + else + return -1; + } + + /** {@inheritDoc} */ + @Override public int read(long pos, byte[] buf, int off, int len) throws IOException { + seek(pos); + + return read(buf, off, len); + } + + /** {@inheritDoc} */ + @Override public void readFully(long pos, byte[] buf) throws IOException { + readFully(pos, buf, 0, buf.length); + } + + /** {@inheritDoc} */ + @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException { + seek(pos); + + for (int readBytes = 0; readBytes < len;) { + int read = read(buf, off + readBytes, len - readBytes); + + if (read == -1) + throw new EOFException("Failed to read stream fully (stream ends unexpectedly) [pos=" + pos + + ", buf.length=" + buf.length + ", off=" + off + ", len=" + len + ']'); + + readBytes += read; + } + } + + /** {@inheritDoc} */ + @Override public void seek(long pos) throws IOException { + if (pos < 0) + throw new IOException("Seek position cannot be negative: " + pos); + + is.seek(start + pos); + + this.pos = pos; + } + + /** {@inheritDoc} */ + @Override public long position() { + return pos; + } + + /** + * Since range input stream represents a part of larger file stream, there is an offset at which this + * range input stream starts in original input stream. This method returns start offset of this input + * stream relative to original input stream. + * + * @return Start offset in original input stream. + */ + public long startOffset() { + return start; + } + + /** {@inheritDoc} */ + @Override public int available() { + long l = maxLen - pos; + + if (l < 0) + return 0; + + if (l > Integer.MAX_VALUE) + return Integer.MAX_VALUE; + + return (int)l; + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + is.close(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteFsRangeInputStream.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java new file mode 100644 index 0000000..fdddc06 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java @@ -0,0 +1,50 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.ggfs.mapreduce; + +import org.apache.ignite.*; +import org.gridgain.grid.*; +import org.gridgain.grid.ggfs.*; +import org.gridgain.grid.ggfs.mapreduce.records.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * GGFS record resolver. When {@link IgniteFsTask} is split into {@link IgniteFsJob}s each produced job will obtain + * {@link IgniteFsFileRange} based on file data location. Record resolver is invoked in each job before actual + * execution in order to adjust record boundaries in a way consistent with user data. + * <p> + * E.g., you may want to split your task into jobs so that each job process zero, one or several lines from that file. + * But file is split into ranges based on block locations, not new line boundaries. Using convenient record resolver + * you can adjust job range so that it covers the whole line(s). + * <p> + * The following record resolvers are available out of the box: + * <ul> + * <li>{@link GridGgfsFixedLengthRecordResolver}</li> + * <li>{@link GridGgfsByteDelimiterRecordResolver}</li> + * <li>{@link GridGgfsStringDelimiterRecordResolver}</li> + * <li>{@link GridGgfsNewLineRecordResolver}</li> + * </ul> + */ +public interface IgniteFsRecordResolver extends Serializable { + /** + * Adjusts record start offset and length. + * + * @param ggfs GGFS instance to use. + * @param stream Input stream for split file. + * @param suggestedRecord Suggested file system record. + * @return New adjusted record. If this method returns {@code null}, original record is ignored. + * @throws GridException If resolve failed. + * @throws IOException If resolve failed. + */ + @Nullable public IgniteFsFileRange resolveRecords(IgniteFs ggfs, IgniteFsInputStream stream, + IgniteFsFileRange suggestedRecord) throws GridException, IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java new file mode 100644 index 0000000..0721d0b --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java @@ -0,0 +1,165 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.ggfs.mapreduce; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.resources.*; +import org.gridgain.grid.*; +import org.gridgain.grid.ggfs.*; +import org.gridgain.grid.kernal.*; +import org.gridgain.grid.kernal.processors.ggfs.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * GGFS task which can be executed on the grid using one of {@code GridGgfs.execute()} methods. Essentially GGFS task + * is regular {@link org.apache.ignite.compute.ComputeTask} with different map logic. Instead of implementing + * {@link org.apache.ignite.compute.ComputeTask#map(List, Object)} method to split task into jobs, you must implement + * {@link IgniteFsTask#createJob(org.gridgain.grid.ggfs.IgniteFsPath, IgniteFsFileRange, GridGgfsTaskArgs)} method. + * <p> + * Each file participating in GGFS task is split into {@link IgniteFsFileRange}s first. Normally range is a number of + * consequent bytes located on a single node (see {@code GridGgfsGroupDataBlocksKeyMapper}). In case maximum range size + * is provided (either through {@link org.gridgain.grid.ggfs.IgniteFsConfiguration#getMaximumTaskRangeLength()} or {@code GridGgfs.execute()} + * argument), then ranges could be further divided into smaller chunks. + * <p> + * Once file is split into ranges, each range is passed to {@code GridGgfsTask.createJob()} method in order to create a + * {@link IgniteFsJob}. + * <p> + * Finally all generated jobs are sent to Grid nodes for execution. + * <p> + * As with regular {@code GridComputeTask} you can define your own logic for results handling and reduce step. + * <p> + * Here is an example of such a task: + * <pre name="code" class="java"> + * public class WordCountTask extends GridGgfsTask<String, Integer> { + * @Override + * public GridGgfsJob createJob(GridGgfsPath path, GridGgfsFileRange range, GridGgfsTaskArgs<T> args) throws GridException { + * // New job will be created for each range within each file. + * // We pass user-provided argument (which is essentially a word to look for) to that job. + * return new WordCountJob(args.userArgument()); + * } + * + * // Aggregate results into one compound result. + * public Integer reduce(List<GridComputeJobResult> results) throws GridException { + * Integer total = 0; + * + * for (GridComputeJobResult res : results) { + * Integer cnt = res.getData(); + * + * // Null can be returned for non-existent file in case we decide to ignore such situations. + * if (cnt != null) + * total += cnt; + * } + * + * return total; + * } + * } + * </pre> + */ +public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<GridGgfsTaskArgs<T>, R> { + /** */ + private static final long serialVersionUID = 0L; + + /** Injected grid. */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Nullable @Override public final Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable GridGgfsTaskArgs<T> args) throws GridException { + assert ignite != null; + assert args != null; + + IgniteFs ggfs = ignite.fileSystem(args.ggfsName()); + GridGgfsProcessorAdapter ggfsProc = ((GridKernal) ignite).context().ggfs(); + + Map<ComputeJob, ClusterNode> splitMap = new HashMap<>(); + + Map<UUID, ClusterNode> nodes = mapSubgrid(subgrid); + + for (IgniteFsPath path : args.paths()) { + IgniteFsFile file = ggfs.info(path); + + if (file == null) { + if (args.skipNonExistentFiles()) + continue; + else + throw new GridException("Failed to process GGFS file because it doesn't exist: " + path); + } + + Collection<IgniteFsBlockLocation> aff = ggfs.affinity(path, 0, file.length(), args.maxRangeLength()); + + long totalLen = 0; + + for (IgniteFsBlockLocation loc : aff) { + ClusterNode node = null; + + for (UUID nodeId : loc.nodeIds()) { + node = nodes.get(nodeId); + + if (node != null) + break; + } + + if (node == null) + throw new GridException("Failed to find any of block affinity nodes in subgrid [loc=" + loc + + ", subgrid=" + subgrid + ']'); + + IgniteFsJob job = createJob(path, new IgniteFsFileRange(file.path(), loc.start(), loc.length()), args); + + if (job != null) { + ComputeJob jobImpl = ggfsProc.createJob(job, ggfs.name(), file.path(), loc.start(), + loc.length(), args.recordResolver()); + + splitMap.put(jobImpl, node); + } + + totalLen += loc.length(); + } + + assert totalLen == file.length(); + } + + return splitMap; + } + + /** + * Callback invoked during task map procedure to create job that will process specified split + * for GGFS file. + * + * @param path Path. + * @param range File range based on consecutive blocks. This range will be further + * realigned to record boundaries on destination node. + * @param args Task argument. + * @return GGFS job. If {@code null} is returned, the passed in file range will be skipped. + * @throws GridException If job creation failed. + */ + @Nullable public abstract IgniteFsJob createJob(IgniteFsPath path, IgniteFsFileRange range, + GridGgfsTaskArgs<T> args) throws GridException; + + /** + * Maps list by node ID. + * + * @param subgrid Subgrid. + * @return Map. + */ + private Map<UUID, ClusterNode> mapSubgrid(Collection<ClusterNode> subgrid) { + Map<UUID, ClusterNode> res = U.newHashMap(subgrid.size()); + + for (ClusterNode node : subgrid) + res.put(node.id(), node); + + return res; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsByteDelimiterRecordResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsByteDelimiterRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsByteDelimiterRecordResolver.java index eb9de8d..808092e 100644 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsByteDelimiterRecordResolver.java +++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsByteDelimiterRecordResolver.java @@ -28,7 +28,7 @@ import java.util.*; * <p> * Note that you can use {@link GridGgfsStringDelimiterRecordResolver} if your delimiter is a plain string. */ -public class GridGgfsByteDelimiterRecordResolver implements GridGgfsRecordResolver, Externalizable { +public class GridGgfsByteDelimiterRecordResolver implements IgniteFsRecordResolver, Externalizable { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsFixedLengthRecordResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsFixedLengthRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsFixedLengthRecordResolver.java index 3230062..1edeb1a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsFixedLengthRecordResolver.java +++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/GridGgfsFixedLengthRecordResolver.java @@ -21,7 +21,7 @@ import java.io.*; * Record resolver which adjusts records to fixed length. That is, start offset of the record is shifted to the * nearest position so that {@code newStart % length == 0}. */ -public class GridGgfsFixedLengthRecordResolver implements GridGgfsRecordResolver, Externalizable { +public class GridGgfsFixedLengthRecordResolver implements IgniteFsRecordResolver, Externalizable { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAsyncImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAsyncImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAsyncImpl.java index 6e4ad66..407249b 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAsyncImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAsyncImpl.java @@ -46,27 +46,27 @@ public class GridGgfsAsyncImpl extends IgniteAsyncSupportAdapter implements Grid } /** {@inheritDoc} */ - @Override public <T, R> R execute(GridGgfsTask<T, R> task, @Nullable GridGgfsRecordResolver rslvr, + @Override public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) throws GridException { return saveOrGet(ggfs.executeAsync(task, rslvr, paths, arg)); } /** {@inheritDoc} */ - @Override public <T, R> R execute(GridGgfsTask<T, R> task, @Nullable GridGgfsRecordResolver rslvr, + @Override public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) throws GridException { return saveOrGet(ggfs.executeAsync(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg)); } /** {@inheritDoc} */ - @Override public <T, R> R execute(Class<? extends GridGgfsTask<T, R>> taskCls, - @Nullable GridGgfsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) throws GridException { + @Override public <T, R> R execute(Class<? extends IgniteFsTask<T, R>> taskCls, + @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) throws GridException { return saveOrGet(ggfs.executeAsync(taskCls, rslvr, paths, arg)); } /** {@inheritDoc} */ - @Override public <T, R> R execute(Class<? extends GridGgfsTask<T, R>> taskCls, - @Nullable GridGgfsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, + @Override public <T, R> R execute(Class<? extends IgniteFsTask<T, R>> taskCls, + @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) throws GridException { return saveOrGet(ggfs.executeAsync(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java index 5145abd..dff827e 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java @@ -1667,27 +1667,27 @@ public final class GridGgfsImpl implements GridGgfsEx { } /** {@inheritDoc} */ - @Override public <T, R> R execute(GridGgfsTask<T, R> task, @Nullable GridGgfsRecordResolver rslvr, + @Override public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) throws GridException { return executeAsync(task, rslvr, paths, arg).get(); } /** {@inheritDoc} */ - @Override public <T, R> R execute(GridGgfsTask<T, R> task, @Nullable GridGgfsRecordResolver rslvr, + @Override public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) throws GridException { return executeAsync(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg).get(); } /** {@inheritDoc} */ - @Override public <T, R> R execute(Class<? extends GridGgfsTask<T, R>> taskCls, - @Nullable GridGgfsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) throws GridException { + @Override public <T, R> R execute(Class<? extends IgniteFsTask<T, R>> taskCls, + @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) throws GridException { return executeAsync(taskCls, rslvr, paths, arg).get(); } /** {@inheritDoc} */ - @Override public <T, R> R execute(Class<? extends GridGgfsTask<T, R>> taskCls, - @Nullable GridGgfsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, + @Override public <T, R> R execute(Class<? extends IgniteFsTask<T, R>> taskCls, + @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, long maxRangeSize, @Nullable T arg) throws GridException { return executeAsync(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeSize, arg).get(); } @@ -1701,7 +1701,7 @@ public final class GridGgfsImpl implements GridGgfsEx { * @param arg Optional task argument. * @return Execution future. */ - <T, R> IgniteFuture<R> executeAsync(GridGgfsTask<T, R> task, @Nullable GridGgfsRecordResolver rslvr, + <T, R> IgniteFuture<R> executeAsync(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) { return executeAsync(task, rslvr, paths, true, cfg.getMaximumTaskRangeLength(), arg); } @@ -1720,7 +1720,7 @@ public final class GridGgfsImpl implements GridGgfsEx { * @param arg Optional task argument. * @return Execution future. */ - <T, R> IgniteFuture<R> executeAsync(GridGgfsTask<T, R> task, @Nullable GridGgfsRecordResolver rslvr, + <T, R> IgniteFuture<R> executeAsync(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) { return ggfsCtx.kernalContext().task().execute(task, new GridGgfsTaskArgsImpl<>(cfg.getName(), paths, rslvr, skipNonExistentFiles, maxRangeLen, arg)); @@ -1735,8 +1735,8 @@ public final class GridGgfsImpl implements GridGgfsEx { * @param arg Optional task argument. * @return Execution future. */ - <T, R> IgniteFuture<R> executeAsync(Class<? extends GridGgfsTask<T, R>> taskCls, - @Nullable GridGgfsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) { + <T, R> IgniteFuture<R> executeAsync(Class<? extends IgniteFsTask<T, R>> taskCls, + @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) { return executeAsync(taskCls, rslvr, paths, true, cfg.getMaximumTaskRangeLength(), arg); } @@ -1753,10 +1753,10 @@ public final class GridGgfsImpl implements GridGgfsEx { * @param arg Optional task argument. * @return Execution future. */ - <T, R> IgniteFuture<R> executeAsync(Class<? extends GridGgfsTask<T, R>> taskCls, - @Nullable GridGgfsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, + <T, R> IgniteFuture<R> executeAsync(Class<? extends IgniteFsTask<T, R>> taskCls, + @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) { - return ggfsCtx.kernalContext().task().execute((Class<GridGgfsTask<T, R>>)taskCls, + return ggfsCtx.kernalContext().task().execute((Class<IgniteFsTask<T, R>>)taskCls, new GridGgfsTaskArgsImpl<>(cfg.getName(), paths, rslvr, skipNonExistentFiles, maxRangeLen, arg)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsJobImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsJobImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsJobImpl.java index 2abbef7..4869103 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsJobImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsJobImpl.java @@ -42,7 +42,7 @@ public class GridGgfsJobImpl implements ComputeJob, GridInternalWrapper<IgniteFs private long len; /** Split resolver. */ - private GridGgfsRecordResolver rslvr; + private IgniteFsRecordResolver rslvr; /** Injected grid. */ @IgniteInstanceResource @@ -61,7 +61,7 @@ public class GridGgfsJobImpl implements ComputeJob, GridInternalWrapper<IgniteFs * @param rslvr GGFS split resolver. */ public GridGgfsJobImpl(IgniteFsJob job, String ggfsName, IgniteFsPath path, long start, long len, - GridGgfsRecordResolver rslvr) { + IgniteFsRecordResolver rslvr) { this.job = job; this.ggfsName = ggfsName; this.path = path; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java index 8a73665..99f817a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java @@ -223,7 +223,7 @@ public class GridGgfsProcessor extends GridGgfsProcessorAdapter { /** {@inheritDoc} */ @Nullable @Override public ComputeJob createJob(IgniteFsJob job, @Nullable String ggfsName, IgniteFsPath path, - long start, long length, GridGgfsRecordResolver recRslv) { + long start, long length, IgniteFsRecordResolver recRslv) { return new GridGgfsJobImpl(job, ggfsName, path, start, length, recRslv); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessorAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessorAdapter.java index 85ef324..066cd5c 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessorAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessorAdapter.java @@ -68,5 +68,5 @@ public abstract class GridGgfsProcessorAdapter extends GridProcessorAdapter { * @return Compute job. */ @Nullable public abstract ComputeJob createJob(IgniteFsJob job, @Nullable String ggfsName, IgniteFsPath path, - long start, long length, GridGgfsRecordResolver recRslv); + long start, long length, IgniteFsRecordResolver recRslv); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskArgsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskArgsImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskArgsImpl.java index 516e5a4..44cac9d 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskArgsImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskArgsImpl.java @@ -30,7 +30,7 @@ public class GridGgfsTaskArgsImpl<T> implements GridGgfsTaskArgs<T>, Externaliz private Collection<IgniteFsPath> paths; /** Record resolver. */ - private GridGgfsRecordResolver recRslvr; + private IgniteFsRecordResolver recRslvr; /** Skip non existent files flag. */ private boolean skipNonExistentFiles; @@ -58,7 +58,7 @@ public class GridGgfsTaskArgsImpl<T> implements GridGgfsTaskArgs<T>, Externaliz * @param maxRangeLen Maximum range length. * @param usrArg User argument. */ - public GridGgfsTaskArgsImpl(String ggfsName, Collection<IgniteFsPath> paths, GridGgfsRecordResolver recRslvr, + public GridGgfsTaskArgsImpl(String ggfsName, Collection<IgniteFsPath> paths, IgniteFsRecordResolver recRslvr, boolean skipNonExistentFiles, long maxRangeLen, T usrArg) { this.ggfsName = ggfsName; this.paths = paths; @@ -79,7 +79,7 @@ public class GridGgfsTaskArgsImpl<T> implements GridGgfsTaskArgs<T>, Externaliz } /** {@inheritDoc} */ - @Override public GridGgfsRecordResolver recordResolver() { + @Override public IgniteFsRecordResolver recordResolver() { return recRslvr; } @@ -119,7 +119,7 @@ public class GridGgfsTaskArgsImpl<T> implements GridGgfsTaskArgs<T>, Externaliz ggfsName = U.readString(in); paths = U.readCollection(in); - recRslvr = (GridGgfsRecordResolver)in.readObject(); + recRslvr = (IgniteFsRecordResolver)in.readObject(); skipNonExistentFiles = in.readBoolean(); maxRangeLen = in.readLong(); usrArg = (T)in.readObject(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsProcessor.java index 5654a95..e73779a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsProcessor.java @@ -57,7 +57,7 @@ public class GridNoopGgfsProcessor extends GridGgfsProcessorAdapter { /** {@inheritDoc} */ @Nullable @Override public ComputeJob createJob(IgniteFsJob job, @Nullable String ggfsName, IgniteFsPath path, - long start, long length, GridGgfsRecordResolver recRslv) { + long start, long length, IgniteFsRecordResolver recRslv) { return null; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java index 9e3838a..9443ee7 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java @@ -35,7 +35,7 @@ import static org.gridgain.grid.cache.GridCacheWriteSynchronizationMode.*; import static org.gridgain.grid.ggfs.IgniteFsMode.*; /** - * Tests for {@link GridGgfsTask}. + * Tests for {@link org.gridgain.grid.ggfs.mapreduce.IgniteFsTask}. */ public class GridGgfsTaskSelfTest extends GridGgfsCommonAbstractTest { /** Predefined words dictionary. */ @@ -228,7 +228,7 @@ public class GridGgfsTaskSelfTest extends GridGgfsCommonAbstractTest { /** * Task. */ - private static class Task extends GridGgfsTask<String, IgniteBiTuple<Long, Integer>> { + private static class Task extends IgniteFsTask<String, IgniteBiTuple<Long, Integer>> { /** {@inheritDoc} */ @Override public IgniteFsJob createJob(IgniteFsPath path, IgniteFsFileRange range, GridGgfsTaskArgs<String> args) throws GridException { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7cd638fe/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java index b131f20..eca987f 100644 --- a/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java +++ b/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java @@ -872,28 +872,28 @@ public class GridHadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstrac } /** {@inheritDoc} */ - @Override public <T, R> R execute(GridGgfsTask<T, R> task, @Nullable GridGgfsRecordResolver rslvr, + @Override public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) throws GridException { return null; } /** {@inheritDoc} */ - @Override public <T, R> R execute(GridGgfsTask<T, R> task, @Nullable GridGgfsRecordResolver rslvr, + @Override public <T, R> R execute(IgniteFsTask<T, R> task, @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) throws GridException { return null; } /** {@inheritDoc} */ - @Override public <T, R> R execute(Class<? extends GridGgfsTask<T, R>> taskCls, - @Nullable GridGgfsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) + @Override public <T, R> R execute(Class<? extends IgniteFsTask<T, R>> taskCls, + @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, @Nullable T arg) throws GridException { return null; } /** {@inheritDoc} */ - @Override public <T, R> R execute(Class<? extends GridGgfsTask<T, R>> taskCls, - @Nullable GridGgfsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, + @Override public <T, R> R execute(Class<? extends IgniteFsTask<T, R>> taskCls, + @Nullable IgniteFsRecordResolver rslvr, Collection<IgniteFsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) throws GridException { return null; }