# Renaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bf07cfae Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bf07cfae Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bf07cfae Branch: refs/heads/master Commit: bf07cfaea33a59d6bfd310ace13b562f4c086097 Parents: 81e0195 Author: sboikov <sboi...@gridgain.com> Authored: Fri Dec 5 11:32:07 2014 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Dec 5 11:32:07 2014 +0300 ---------------------------------------------------------------------- .../examples/ggfs/GgfsMapReduceExample.java | 4 +- .../main/java/org/apache/ignite/IgniteFs.java | 2 +- .../apache/ignite/fs/IgniteFsConfiguration.java | 4 +- .../ignite/fs/mapreduce/IgniteFsFileRange.java | 72 +++++++ .../IgniteFsInputStreamJobAdapter.java | 45 +++++ .../apache/ignite/fs/mapreduce/IgniteFsJob.java | 62 ++++++ .../ignite/fs/mapreduce/IgniteFsJobAdapter.java | 20 ++ .../fs/mapreduce/IgniteFsRangeInputStream.java | 189 +++++++++++++++++++ .../fs/mapreduce/IgniteFsRecordResolver.java | 49 +++++ .../ignite/fs/mapreduce/IgniteFsTask.java | 165 ++++++++++++++++ .../ignite/fs/mapreduce/IgniteFsTaskArgs.java | 74 ++++++++ .../mapreduce/IgniteFsTaskNoReduceAdapter.java | 34 ++++ .../org/apache/ignite/fs/mapreduce/package.html | 15 ++ .../grid/ggfs/mapreduce/IgniteFsFileRange.java | 72 ------- .../IgniteFsInputStreamJobAdapter.java | 45 ----- .../grid/ggfs/mapreduce/IgniteFsJob.java | 62 ------ .../grid/ggfs/mapreduce/IgniteFsJobAdapter.java | 20 -- .../mapreduce/IgniteFsRangeInputStream.java | 189 ------------------- .../ggfs/mapreduce/IgniteFsRecordResolver.java | 49 ----- .../grid/ggfs/mapreduce/IgniteFsTask.java | 165 ---------------- .../grid/ggfs/mapreduce/IgniteFsTaskArgs.java | 74 -------- .../mapreduce/IgniteFsTaskNoReduceAdapter.java | 34 ---- .../gridgain/grid/ggfs/mapreduce/package.html | 15 -- .../IgniteFsByteDelimiterRecordResolver.java | 2 +- .../IgniteFsFixedLengthRecordResolver.java | 2 +- .../processors/ggfs/GridGgfsAsyncImpl.java | 2 +- .../kernal/processors/ggfs/GridGgfsImpl.java | 2 +- .../kernal/processors/ggfs/GridGgfsJobImpl.java | 2 +- .../processors/ggfs/GridGgfsProcessor.java | 2 +- .../ggfs/GridGgfsProcessorAdapter.java | 2 +- .../processors/ggfs/GridNoopGgfsProcessor.java | 2 +- .../processors/ggfs/IgniteFsTaskArgsImpl.java | 2 +- .../processors/ggfs/GridGgfsTaskSelfTest.java | 4 +- .../GridGgfsAbstractRecordResolverSelfTest.java | 2 +- ...GgfsByteDelimiterRecordResolverSelfTest.java | 2 +- ...idGgfsFixedLengthRecordResolverSelfTest.java | 2 +- ...sNewLineDelimiterRecordResolverSelfTest.java | 2 +- ...fsStringDelimiterRecordResolverSelfTest.java | 2 +- ...idHadoopDefaultMapReducePlannerSelfTest.java | 2 +- 39 files changed, 747 insertions(+), 747 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java b/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java index ec9c52a..10913b0 100644 --- a/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java +++ b/examples/src/main/java/org/gridgain/examples/ggfs/GgfsMapReduceExample.java @@ -12,15 +12,15 @@ package org.gridgain.examples.ggfs; import org.apache.ignite.*; import org.apache.ignite.compute.*; import org.apache.ignite.fs.*; +import org.apache.ignite.fs.mapreduce.*; import org.gridgain.grid.*; -import org.gridgain.grid.ggfs.mapreduce.*; import org.gridgain.grid.ggfs.mapreduce.records.*; import java.io.*; import java.util.*; /** - * Example that shows how to use {@link org.gridgain.grid.ggfs.mapreduce.IgniteFsTask} to find lines matching particular pattern in the file in pretty + * Example that shows how to use {@link org.apache.ignite.fs.mapreduce.IgniteFsTask} to find lines matching particular pattern in the file in pretty * the same way as {@code grep} command does. * <p> * Remote nodes should always be started with configuration file which includes http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/apache/ignite/IgniteFs.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteFs.java b/modules/core/src/main/java/org/apache/ignite/IgniteFs.java index 323a778..11e4732 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteFs.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteFs.java @@ -10,9 +10,9 @@ package org.apache.ignite; import org.apache.ignite.fs.*; +import org.apache.ignite.fs.mapreduce.*; import org.apache.ignite.lang.*; import org.gridgain.grid.*; -import org.gridgain.grid.ggfs.mapreduce.*; import org.jetbrains.annotations.*; import java.util.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/apache/ignite/fs/IgniteFsConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/fs/IgniteFsConfiguration.java b/modules/core/src/main/java/org/apache/ignite/fs/IgniteFsConfiguration.java index 13e6746..f98b92b 100644 --- a/modules/core/src/main/java/org/apache/ignite/fs/IgniteFsConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/fs/IgniteFsConfiguration.java @@ -753,13 +753,13 @@ public class IgniteFsConfiguration { /** * Get maximum default range size of a file being split during GGFS task execution. When GGFS task is about to - * be executed, it requests file block locations first. Each location is defined as {@link org.gridgain.grid.ggfs.mapreduce.IgniteFsFileRange} which + * be executed, it requests file block locations first. Each location is defined as {@link org.apache.ignite.fs.mapreduce.IgniteFsFileRange} which * has length. In case this parameter is set to positive value, then GGFS will split single file range into smaller * ranges with length not greater that this parameter. The only exception to this case is when maximum task range * length is smaller than file block size. In this case maximum task range size will be overridden and set to file * block size. * <p> - * Note that this parameter is applied when task is split into jobs before {@link org.gridgain.grid.ggfs.mapreduce.IgniteFsRecordResolver} is + * Note that this parameter is applied when task is split into jobs before {@link org.apache.ignite.fs.mapreduce.IgniteFsRecordResolver} is * applied. Therefore, final file ranges being assigned to particular jobs could be greater than value of this * parameter depending on file data layout and selected resolver type. * <p> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsFileRange.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsFileRange.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsFileRange.java new file mode 100644 index 0000000..9e49cb1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsFileRange.java @@ -0,0 +1,72 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.fs.mapreduce; + +import org.apache.ignite.fs.*; +import org.gridgain.grid.util.typedef.internal.*; + +/** + * Entity representing part of GGFS file identified by file path, start position, and length. + */ +public class IgniteFsFileRange { + /** File path. */ + private IgniteFsPath path; + + /** Start position. */ + private long start; + + /** Length. */ + private long len; + + /** + * Creates file range. + * + * @param path File path. + * @param start Start position. + * @param len Length. + */ + public IgniteFsFileRange(IgniteFsPath path, long start, long len) { + this.path = path; + this.start = start; + this.len = len; + } + + /** + * Gets file path. + * + * @return File path. + */ + public IgniteFsPath path() { + return path; + } + + /** + * Gets range start position. + * + * @return Start position. + */ + public long start() { + return start; + } + + /** + * Gets range length. + * + * @return Length. + */ + public long length() { + return len; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteFsFileRange.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsInputStreamJobAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsInputStreamJobAdapter.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsInputStreamJobAdapter.java new file mode 100644 index 0000000..23449be --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsInputStreamJobAdapter.java @@ -0,0 +1,45 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.fs.mapreduce; + +import org.apache.ignite.*; +import org.apache.ignite.fs.*; +import org.gridgain.grid.*; +import org.gridgain.grid.util.*; + +import java.io.*; + +/** + * Convenient {@link IgniteFsJob} adapter. It limits data returned from {@link org.apache.ignite.fs.IgniteFsInputStream} to bytes within + * the {@link IgniteFsFileRange} assigned to the job. + * <p> + * Under the covers it simply puts job's {@code GridGgfsInputStream} position to range start and wraps in into + * {@link GridFixedSizeInputStream} limited to range length. + */ +public abstract class IgniteFsInputStreamJobAdapter extends IgniteFsJobAdapter { + /** {@inheritDoc} */ + @Override public final Object execute(IgniteFs ggfs, IgniteFsFileRange range, IgniteFsInputStream in) + throws GridException, IOException { + in.seek(range.start()); + + return execute(ggfs, new IgniteFsRangeInputStream(in, range)); + } + + /** + * Executes this job. + * + * @param ggfs GGFS instance. + * @param in Input stream. + * @return Execution result. + * @throws GridException If execution failed. + * @throws IOException If IO exception encountered while working with stream. + */ + public abstract Object execute(IgniteFs ggfs, IgniteFsRangeInputStream in) throws GridException, IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsJob.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsJob.java new file mode 100644 index 0000000..b4921eb --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsJob.java @@ -0,0 +1,62 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.fs.mapreduce; + +import org.apache.ignite.*; +import org.apache.ignite.fs.*; +import org.gridgain.grid.*; + +import java.io.*; + +/** + * Defines executable unit for {@link IgniteFsTask}. Before this job is executed, it is assigned one of the + * ranges provided by the {@link IgniteFsRecordResolver} passed to one of the {@code GridGgfs.execute(...)} methods. + * <p> + * {@link #execute(org.apache.ignite.IgniteFs, IgniteFsFileRange, org.apache.ignite.fs.IgniteFsInputStream)} method is given {@link IgniteFsFileRange} this + * job is expected to operate on, and already opened {@link org.apache.ignite.fs.IgniteFsInputStream} for the file this range belongs to. + * <p> + * Note that provided input stream has position already adjusted to range start. However, it will not + * automatically stop on range end. This is done to provide capability in some cases to look beyond + * the range end or seek position before the reange start. + * <p> + * In majority of the cases, when you want to process only provided range, you should explicitly control amount + * of returned data and stop at range end. You can also use {@link IgniteFsInputStreamJobAdapter}, which operates + * on {@link IgniteFsRangeInputStream} bounded to range start and end, or manually wrap provided input stream with + * {@link IgniteFsRangeInputStream}. + * <p> + * You can inject any resources in concrete implementation, just as with regular {@link org.apache.ignite.compute.ComputeJob} implementations. + */ +public interface IgniteFsJob { + /** + * Executes this job. + * + * @param ggfs GGFS instance. + * @param range File range aligned to record boundaries. + * @param in Input stream for split file. This input stream is not aligned to range and points to file start + * by default. + * @return Execution result. + * @throws GridException If execution failed. + * @throws IOException If file system operation resulted in IO exception. + */ + public Object execute(IgniteFs ggfs, IgniteFsFileRange range, IgniteFsInputStream in) throws GridException, + IOException; + + /** + * This method is called when system detects that completion of this + * job can no longer alter the overall outcome (for example, when parent task + * has already reduced the results). Job is also cancelled when + * {@link org.apache.ignite.compute.ComputeTaskFuture#cancel()} is called. + * <p> + * Note that job cancellation is only a hint, and just like with + * {@link Thread#interrupt()} method, it is really up to the actual job + * instance to gracefully finish execution and exit. + */ + public void cancel(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsJobAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsJobAdapter.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsJobAdapter.java new file mode 100644 index 0000000..af20038 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsJobAdapter.java @@ -0,0 +1,20 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.fs.mapreduce; + +/** + * Adapter for {@link IgniteFsJob} with no-op implementation of {@link #cancel()} method. + */ +public abstract class IgniteFsJobAdapter implements IgniteFsJob { + /** {@inheritDoc} */ + @Override public void cancel() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRangeInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRangeInputStream.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRangeInputStream.java new file mode 100644 index 0000000..f59a7d3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRangeInputStream.java @@ -0,0 +1,189 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.fs.mapreduce; + +import org.apache.ignite.fs.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Decorator for regular {@link org.apache.ignite.fs.IgniteFsInputStream} which streams only data within the given range. + * This stream is used for {@link IgniteFsInputStreamJobAdapter} convenience adapter to create + * jobs which will be working only with the assigned range. You can also use it explicitly when + * working with {@link IgniteFsJob} directly. + */ +public final class IgniteFsRangeInputStream extends IgniteFsInputStream { + /** Base input stream. */ + private final IgniteFsInputStream is; + + /** Start position. */ + private final long start; + + /** Maximum stream length. */ + private final long maxLen; + + /** Current position within the stream. */ + private long pos; + + /** + * Constructor. + * + * @param is Base input stream. + * @param start Start position. + * @param maxLen Maximum stream length. + * @throws IOException In case of exception. + */ + public IgniteFsRangeInputStream(IgniteFsInputStream is, long start, long maxLen) throws IOException { + if (is == null) + throw new IllegalArgumentException("Input stream cannot be null."); + + if (start < 0) + throw new IllegalArgumentException("Start position cannot be negative."); + + if (start >= is.length()) + throw new IllegalArgumentException("Start position cannot be greater that file length."); + + if (maxLen < 0) + throw new IllegalArgumentException("Length cannot be negative."); + + if (start + maxLen > is.length()) + throw new IllegalArgumentException("Sum of start position and length cannot be greater than file length."); + + this.is = is; + this.start = start; + this.maxLen = maxLen; + + is.seek(start); + } + + /** {@inheritDoc} */ + @Override public long length() { + return is.length(); + } + + /** + * Constructor. + * + * @param is Base input stream. + * @param range File range. + * @throws IOException In case of exception. + */ + public IgniteFsRangeInputStream(IgniteFsInputStream is, IgniteFsFileRange range) throws IOException { + this(is, range.start(), range.length()); + } + + /** {@inheritDoc} */ + @Override public int read() throws IOException { + if (pos < maxLen) { + int res = is.read(); + + if (res != -1) + pos++; + + return res; + } + else + return -1; + } + + /** {@inheritDoc} */ + @Override public int read(@NotNull byte[] b, int off, int len) throws IOException { + if (pos < maxLen) { + len = (int)Math.min(len, maxLen - pos); + + int res = is.read(b, off, len); + + if (res != -1) + pos += res; + + return res; + } + else + return -1; + } + + /** {@inheritDoc} */ + @Override public int read(long pos, byte[] buf, int off, int len) throws IOException { + seek(pos); + + return read(buf, off, len); + } + + /** {@inheritDoc} */ + @Override public void readFully(long pos, byte[] buf) throws IOException { + readFully(pos, buf, 0, buf.length); + } + + /** {@inheritDoc} */ + @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException { + seek(pos); + + for (int readBytes = 0; readBytes < len;) { + int read = read(buf, off + readBytes, len - readBytes); + + if (read == -1) + throw new EOFException("Failed to read stream fully (stream ends unexpectedly) [pos=" + pos + + ", buf.length=" + buf.length + ", off=" + off + ", len=" + len + ']'); + + readBytes += read; + } + } + + /** {@inheritDoc} */ + @Override public void seek(long pos) throws IOException { + if (pos < 0) + throw new IOException("Seek position cannot be negative: " + pos); + + is.seek(start + pos); + + this.pos = pos; + } + + /** {@inheritDoc} */ + @Override public long position() { + return pos; + } + + /** + * Since range input stream represents a part of larger file stream, there is an offset at which this + * range input stream starts in original input stream. This method returns start offset of this input + * stream relative to original input stream. + * + * @return Start offset in original input stream. + */ + public long startOffset() { + return start; + } + + /** {@inheritDoc} */ + @Override public int available() { + long l = maxLen - pos; + + if (l < 0) + return 0; + + if (l > Integer.MAX_VALUE) + return Integer.MAX_VALUE; + + return (int)l; + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + is.close(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteFsRangeInputStream.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRecordResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRecordResolver.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRecordResolver.java new file mode 100644 index 0000000..3e347b7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsRecordResolver.java @@ -0,0 +1,49 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.fs.mapreduce; + +import org.apache.ignite.*; +import org.apache.ignite.fs.*; +import org.gridgain.grid.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * GGFS record resolver. When {@link IgniteFsTask} is split into {@link IgniteFsJob}s each produced job will obtain + * {@link IgniteFsFileRange} based on file data location. Record resolver is invoked in each job before actual + * execution in order to adjust record boundaries in a way consistent with user data. + * <p> + * E.g., you may want to split your task into jobs so that each job process zero, one or several lines from that file. + * But file is split into ranges based on block locations, not new line boundaries. Using convenient record resolver + * you can adjust job range so that it covers the whole line(s). + * <p> + * The following record resolvers are available out of the box: + * <ul> + * <li>{@link org.gridgain.grid.ggfs.mapreduce.records.IgniteFsFixedLengthRecordResolver}</li> + * <li>{@link org.gridgain.grid.ggfs.mapreduce.records.IgniteFsByteDelimiterRecordResolver}</li> + * <li>{@link org.gridgain.grid.ggfs.mapreduce.records.IgniteFsStringDelimiterRecordResolver}</li> + * <li>{@link org.gridgain.grid.ggfs.mapreduce.records.IgniteFsNewLineRecordResolver}</li> + * </ul> + */ +public interface IgniteFsRecordResolver extends Serializable { + /** + * Adjusts record start offset and length. + * + * @param ggfs GGFS instance to use. + * @param stream Input stream for split file. + * @param suggestedRecord Suggested file system record. + * @return New adjusted record. If this method returns {@code null}, original record is ignored. + * @throws GridException If resolve failed. + * @throws IOException If resolve failed. + */ + @Nullable public IgniteFsFileRange resolveRecords(IgniteFs ggfs, IgniteFsInputStream stream, + IgniteFsFileRange suggestedRecord) throws GridException, IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTask.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTask.java new file mode 100644 index 0000000..c1383bf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTask.java @@ -0,0 +1,165 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.fs.mapreduce; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.fs.*; +import org.apache.ignite.resources.*; +import org.gridgain.grid.*; +import org.gridgain.grid.kernal.*; +import org.gridgain.grid.kernal.processors.ggfs.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * GGFS task which can be executed on the grid using one of {@code GridGgfs.execute()} methods. Essentially GGFS task + * is regular {@link org.apache.ignite.compute.ComputeTask} with different map logic. Instead of implementing + * {@link org.apache.ignite.compute.ComputeTask#map(List, Object)} method to split task into jobs, you must implement + * {@link IgniteFsTask#createJob(org.apache.ignite.fs.IgniteFsPath, IgniteFsFileRange, IgniteFsTaskArgs)} method. + * <p> + * Each file participating in GGFS task is split into {@link IgniteFsFileRange}s first. Normally range is a number of + * consequent bytes located on a single node (see {@code GridGgfsGroupDataBlocksKeyMapper}). In case maximum range size + * is provided (either through {@link org.apache.ignite.fs.IgniteFsConfiguration#getMaximumTaskRangeLength()} or {@code GridGgfs.execute()} + * argument), then ranges could be further divided into smaller chunks. + * <p> + * Once file is split into ranges, each range is passed to {@code GridGgfsTask.createJob()} method in order to create a + * {@link IgniteFsJob}. + * <p> + * Finally all generated jobs are sent to Grid nodes for execution. + * <p> + * As with regular {@code GridComputeTask} you can define your own logic for results handling and reduce step. + * <p> + * Here is an example of such a task: + * <pre name="code" class="java"> + * public class WordCountTask extends GridGgfsTask<String, Integer> { + * @Override + * public GridGgfsJob createJob(GridGgfsPath path, GridGgfsFileRange range, GridGgfsTaskArgs<T> args) throws GridException { + * // New job will be created for each range within each file. + * // We pass user-provided argument (which is essentially a word to look for) to that job. + * return new WordCountJob(args.userArgument()); + * } + * + * // Aggregate results into one compound result. + * public Integer reduce(List<GridComputeJobResult> results) throws GridException { + * Integer total = 0; + * + * for (GridComputeJobResult res : results) { + * Integer cnt = res.getData(); + * + * // Null can be returned for non-existent file in case we decide to ignore such situations. + * if (cnt != null) + * total += cnt; + * } + * + * return total; + * } + * } + * </pre> + */ +public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<IgniteFsTaskArgs<T>, R> { + /** */ + private static final long serialVersionUID = 0L; + + /** Injected grid. */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Nullable @Override public final Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable IgniteFsTaskArgs<T> args) throws GridException { + assert ignite != null; + assert args != null; + + IgniteFs ggfs = ignite.fileSystem(args.ggfsName()); + GridGgfsProcessorAdapter ggfsProc = ((GridKernal) ignite).context().ggfs(); + + Map<ComputeJob, ClusterNode> splitMap = new HashMap<>(); + + Map<UUID, ClusterNode> nodes = mapSubgrid(subgrid); + + for (IgniteFsPath path : args.paths()) { + IgniteFsFile file = ggfs.info(path); + + if (file == null) { + if (args.skipNonExistentFiles()) + continue; + else + throw new GridException("Failed to process GGFS file because it doesn't exist: " + path); + } + + Collection<IgniteFsBlockLocation> aff = ggfs.affinity(path, 0, file.length(), args.maxRangeLength()); + + long totalLen = 0; + + for (IgniteFsBlockLocation loc : aff) { + ClusterNode node = null; + + for (UUID nodeId : loc.nodeIds()) { + node = nodes.get(nodeId); + + if (node != null) + break; + } + + if (node == null) + throw new GridException("Failed to find any of block affinity nodes in subgrid [loc=" + loc + + ", subgrid=" + subgrid + ']'); + + IgniteFsJob job = createJob(path, new IgniteFsFileRange(file.path(), loc.start(), loc.length()), args); + + if (job != null) { + ComputeJob jobImpl = ggfsProc.createJob(job, ggfs.name(), file.path(), loc.start(), + loc.length(), args.recordResolver()); + + splitMap.put(jobImpl, node); + } + + totalLen += loc.length(); + } + + assert totalLen == file.length(); + } + + return splitMap; + } + + /** + * Callback invoked during task map procedure to create job that will process specified split + * for GGFS file. + * + * @param path Path. + * @param range File range based on consecutive blocks. This range will be further + * realigned to record boundaries on destination node. + * @param args Task argument. + * @return GGFS job. If {@code null} is returned, the passed in file range will be skipped. + * @throws GridException If job creation failed. + */ + @Nullable public abstract IgniteFsJob createJob(IgniteFsPath path, IgniteFsFileRange range, + IgniteFsTaskArgs<T> args) throws GridException; + + /** + * Maps list by node ID. + * + * @param subgrid Subgrid. + * @return Map. + */ + private Map<UUID, ClusterNode> mapSubgrid(Collection<ClusterNode> subgrid) { + Map<UUID, ClusterNode> res = U.newHashMap(subgrid.size()); + + for (ClusterNode node : subgrid) + res.put(node.id(), node); + + return res; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTaskArgs.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTaskArgs.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTaskArgs.java new file mode 100644 index 0000000..3880fd3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTaskArgs.java @@ -0,0 +1,74 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.fs.mapreduce; + +import org.apache.ignite.fs.*; + +import java.util.*; + +/** + * GGFS task arguments. When you initiate new GGFS task execution using one of {@code GridGgfs.execute(...)} methods, + * all passed parameters are encapsulated in a single {@code GridGgfsTaskArgs} object. Later on this object is + * passed to {@link IgniteFsTask#createJob(org.apache.ignite.fs.IgniteFsPath, IgniteFsFileRange, IgniteFsTaskArgs)} method. + * <p> + * Task arguments encapsulates the following data: + * <ul> + * <li>GGFS name</li> + * <li>File paths passed to {@code GridGgfs.execute()} method</li> + * <li>{@link IgniteFsRecordResolver} for that task</li> + * <li>Flag indicating whether to skip non-existent file paths or throw an exception</li> + * <li>User-defined task argument</li> + * <li>Maximum file range length for that task (see {@link org.apache.ignite.fs.IgniteFsConfiguration#getMaximumTaskRangeLength()})</li> + * </ul> + */ +public interface IgniteFsTaskArgs<T> { + /** + * Gets GGFS name. + * + * @return GGFS name. + */ + public String ggfsName(); + + /** + * Gets file paths to process. + * + * @return File paths to process. + */ + public Collection<IgniteFsPath> paths(); + + /** + * Gets record resolver for the task. + * + * @return Record resolver. + */ + public IgniteFsRecordResolver recordResolver(); + + /** + * Flag indicating whether to fail or simply skip non-existent files. + * + * @return {@code True} if non-existent files should be skipped. + */ + public boolean skipNonExistentFiles(); + + /** + * User argument provided for task execution. + * + * @return User argument. + */ + public T userArgument(); + + /** + * Optional maximum allowed range length, {@code 0} by default. If not specified, full range including + * all consecutive blocks will be used without any limitations. + * + * @return Maximum range length. + */ + public long maxRangeLength(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTaskNoReduceAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTaskNoReduceAdapter.java b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTaskNoReduceAdapter.java new file mode 100644 index 0000000..45c6e89 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/IgniteFsTaskNoReduceAdapter.java @@ -0,0 +1,34 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.fs.mapreduce; + +import org.apache.ignite.compute.*; + +import java.util.*; + +/** + * Convenient {@link IgniteFsTask} adapter with empty reduce step. Use this adapter in case you are not interested in + * results returned by jobs. + */ +public abstract class IgniteFsTaskNoReduceAdapter<T, R> extends IgniteFsTask<T, R> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Default implementation which will ignore all results sent from execution nodes. + * + * @param results Received results of broadcasted remote executions. Note that if task class has + * {@link org.apache.ignite.compute.ComputeTaskNoResultCache} annotation, then this list will be empty. + * @return Will always return {@code null}. + */ + @Override public R reduce(List<ComputeJobResult> results) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/package.html b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/package.html new file mode 100644 index 0000000..cde37cf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/fs/mapreduce/package.html @@ -0,0 +1,15 @@ +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<!-- + @html.file.header + _________ _____ __________________ _____ + __ ____/___________(_)______ /__ ____/______ ____(_)_______ + _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ +--> +<html> +<body> + <!-- Package description. --> + Contains APIs for In-Memory MapReduce over GGFS. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsFileRange.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsFileRange.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsFileRange.java deleted file mode 100644 index 57ca491..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsFileRange.java +++ /dev/null @@ -1,72 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.ggfs.mapreduce; - -import org.apache.ignite.fs.*; -import org.gridgain.grid.util.typedef.internal.*; - -/** - * Entity representing part of GGFS file identified by file path, start position, and length. - */ -public class IgniteFsFileRange { - /** File path. */ - private IgniteFsPath path; - - /** Start position. */ - private long start; - - /** Length. */ - private long len; - - /** - * Creates file range. - * - * @param path File path. - * @param start Start position. - * @param len Length. - */ - public IgniteFsFileRange(IgniteFsPath path, long start, long len) { - this.path = path; - this.start = start; - this.len = len; - } - - /** - * Gets file path. - * - * @return File path. - */ - public IgniteFsPath path() { - return path; - } - - /** - * Gets range start position. - * - * @return Start position. - */ - public long start() { - return start; - } - - /** - * Gets range length. - * - * @return Length. - */ - public long length() { - return len; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgniteFsFileRange.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsInputStreamJobAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsInputStreamJobAdapter.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsInputStreamJobAdapter.java deleted file mode 100644 index 0432046..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsInputStreamJobAdapter.java +++ /dev/null @@ -1,45 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.ggfs.mapreduce; - -import org.apache.ignite.*; -import org.apache.ignite.fs.*; -import org.gridgain.grid.*; -import org.gridgain.grid.util.*; - -import java.io.*; - -/** - * Convenient {@link IgniteFsJob} adapter. It limits data returned from {@link org.apache.ignite.fs.IgniteFsInputStream} to bytes within - * the {@link IgniteFsFileRange} assigned to the job. - * <p> - * Under the covers it simply puts job's {@code GridGgfsInputStream} position to range start and wraps in into - * {@link GridFixedSizeInputStream} limited to range length. - */ -public abstract class IgniteFsInputStreamJobAdapter extends IgniteFsJobAdapter { - /** {@inheritDoc} */ - @Override public final Object execute(IgniteFs ggfs, IgniteFsFileRange range, IgniteFsInputStream in) - throws GridException, IOException { - in.seek(range.start()); - - return execute(ggfs, new IgniteFsRangeInputStream(in, range)); - } - - /** - * Executes this job. - * - * @param ggfs GGFS instance. - * @param in Input stream. - * @return Execution result. - * @throws GridException If execution failed. - * @throws IOException If IO exception encountered while working with stream. - */ - public abstract Object execute(IgniteFs ggfs, IgniteFsRangeInputStream in) throws GridException, IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJob.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJob.java deleted file mode 100644 index 5bcf6b3..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJob.java +++ /dev/null @@ -1,62 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.ggfs.mapreduce; - -import org.apache.ignite.*; -import org.apache.ignite.fs.*; -import org.gridgain.grid.*; - -import java.io.*; - -/** - * Defines executable unit for {@link IgniteFsTask}. Before this job is executed, it is assigned one of the - * ranges provided by the {@link IgniteFsRecordResolver} passed to one of the {@code GridGgfs.execute(...)} methods. - * <p> - * {@link #execute(org.apache.ignite.IgniteFs, IgniteFsFileRange, org.apache.ignite.fs.IgniteFsInputStream)} method is given {@link IgniteFsFileRange} this - * job is expected to operate on, and already opened {@link org.apache.ignite.fs.IgniteFsInputStream} for the file this range belongs to. - * <p> - * Note that provided input stream has position already adjusted to range start. However, it will not - * automatically stop on range end. This is done to provide capability in some cases to look beyond - * the range end or seek position before the reange start. - * <p> - * In majority of the cases, when you want to process only provided range, you should explicitly control amount - * of returned data and stop at range end. You can also use {@link IgniteFsInputStreamJobAdapter}, which operates - * on {@link IgniteFsRangeInputStream} bounded to range start and end, or manually wrap provided input stream with - * {@link IgniteFsRangeInputStream}. - * <p> - * You can inject any resources in concrete implementation, just as with regular {@link org.apache.ignite.compute.ComputeJob} implementations. - */ -public interface IgniteFsJob { - /** - * Executes this job. - * - * @param ggfs GGFS instance. - * @param range File range aligned to record boundaries. - * @param in Input stream for split file. This input stream is not aligned to range and points to file start - * by default. - * @return Execution result. - * @throws GridException If execution failed. - * @throws IOException If file system operation resulted in IO exception. - */ - public Object execute(IgniteFs ggfs, IgniteFsFileRange range, IgniteFsInputStream in) throws GridException, - IOException; - - /** - * This method is called when system detects that completion of this - * job can no longer alter the overall outcome (for example, when parent task - * has already reduced the results). Job is also cancelled when - * {@link org.apache.ignite.compute.ComputeTaskFuture#cancel()} is called. - * <p> - * Note that job cancellation is only a hint, and just like with - * {@link Thread#interrupt()} method, it is really up to the actual job - * instance to gracefully finish execution and exit. - */ - public void cancel(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJobAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJobAdapter.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJobAdapter.java deleted file mode 100644 index e8aa1ea..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsJobAdapter.java +++ /dev/null @@ -1,20 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.ggfs.mapreduce; - -/** - * Adapter for {@link IgniteFsJob} with no-op implementation of {@link #cancel()} method. - */ -public abstract class IgniteFsJobAdapter implements IgniteFsJob { - /** {@inheritDoc} */ - @Override public void cancel() { - // No-op. - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRangeInputStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRangeInputStream.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRangeInputStream.java deleted file mode 100644 index 6c123eb..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRangeInputStream.java +++ /dev/null @@ -1,189 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.ggfs.mapreduce; - -import org.apache.ignite.fs.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * Decorator for regular {@link org.apache.ignite.fs.IgniteFsInputStream} which streams only data within the given range. - * This stream is used for {@link IgniteFsInputStreamJobAdapter} convenience adapter to create - * jobs which will be working only with the assigned range. You can also use it explicitly when - * working with {@link IgniteFsJob} directly. - */ -public final class IgniteFsRangeInputStream extends IgniteFsInputStream { - /** Base input stream. */ - private final IgniteFsInputStream is; - - /** Start position. */ - private final long start; - - /** Maximum stream length. */ - private final long maxLen; - - /** Current position within the stream. */ - private long pos; - - /** - * Constructor. - * - * @param is Base input stream. - * @param start Start position. - * @param maxLen Maximum stream length. - * @throws IOException In case of exception. - */ - public IgniteFsRangeInputStream(IgniteFsInputStream is, long start, long maxLen) throws IOException { - if (is == null) - throw new IllegalArgumentException("Input stream cannot be null."); - - if (start < 0) - throw new IllegalArgumentException("Start position cannot be negative."); - - if (start >= is.length()) - throw new IllegalArgumentException("Start position cannot be greater that file length."); - - if (maxLen < 0) - throw new IllegalArgumentException("Length cannot be negative."); - - if (start + maxLen > is.length()) - throw new IllegalArgumentException("Sum of start position and length cannot be greater than file length."); - - this.is = is; - this.start = start; - this.maxLen = maxLen; - - is.seek(start); - } - - /** {@inheritDoc} */ - @Override public long length() { - return is.length(); - } - - /** - * Constructor. - * - * @param is Base input stream. - * @param range File range. - * @throws IOException In case of exception. - */ - public IgniteFsRangeInputStream(IgniteFsInputStream is, IgniteFsFileRange range) throws IOException { - this(is, range.start(), range.length()); - } - - /** {@inheritDoc} */ - @Override public int read() throws IOException { - if (pos < maxLen) { - int res = is.read(); - - if (res != -1) - pos++; - - return res; - } - else - return -1; - } - - /** {@inheritDoc} */ - @Override public int read(@NotNull byte[] b, int off, int len) throws IOException { - if (pos < maxLen) { - len = (int)Math.min(len, maxLen - pos); - - int res = is.read(b, off, len); - - if (res != -1) - pos += res; - - return res; - } - else - return -1; - } - - /** {@inheritDoc} */ - @Override public int read(long pos, byte[] buf, int off, int len) throws IOException { - seek(pos); - - return read(buf, off, len); - } - - /** {@inheritDoc} */ - @Override public void readFully(long pos, byte[] buf) throws IOException { - readFully(pos, buf, 0, buf.length); - } - - /** {@inheritDoc} */ - @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException { - seek(pos); - - for (int readBytes = 0; readBytes < len;) { - int read = read(buf, off + readBytes, len - readBytes); - - if (read == -1) - throw new EOFException("Failed to read stream fully (stream ends unexpectedly) [pos=" + pos + - ", buf.length=" + buf.length + ", off=" + off + ", len=" + len + ']'); - - readBytes += read; - } - } - - /** {@inheritDoc} */ - @Override public void seek(long pos) throws IOException { - if (pos < 0) - throw new IOException("Seek position cannot be negative: " + pos); - - is.seek(start + pos); - - this.pos = pos; - } - - /** {@inheritDoc} */ - @Override public long position() { - return pos; - } - - /** - * Since range input stream represents a part of larger file stream, there is an offset at which this - * range input stream starts in original input stream. This method returns start offset of this input - * stream relative to original input stream. - * - * @return Start offset in original input stream. - */ - public long startOffset() { - return start; - } - - /** {@inheritDoc} */ - @Override public int available() { - long l = maxLen - pos; - - if (l < 0) - return 0; - - if (l > Integer.MAX_VALUE) - return Integer.MAX_VALUE; - - return (int)l; - } - - /** {@inheritDoc} */ - @Override public void close() throws IOException { - is.close(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgniteFsRangeInputStream.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java deleted file mode 100644 index 697f575..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsRecordResolver.java +++ /dev/null @@ -1,49 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.ggfs.mapreduce; - -import org.apache.ignite.*; -import org.apache.ignite.fs.*; -import org.gridgain.grid.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * GGFS record resolver. When {@link IgniteFsTask} is split into {@link IgniteFsJob}s each produced job will obtain - * {@link IgniteFsFileRange} based on file data location. Record resolver is invoked in each job before actual - * execution in order to adjust record boundaries in a way consistent with user data. - * <p> - * E.g., you may want to split your task into jobs so that each job process zero, one or several lines from that file. - * But file is split into ranges based on block locations, not new line boundaries. Using convenient record resolver - * you can adjust job range so that it covers the whole line(s). - * <p> - * The following record resolvers are available out of the box: - * <ul> - * <li>{@link org.gridgain.grid.ggfs.mapreduce.records.IgniteFsFixedLengthRecordResolver}</li> - * <li>{@link org.gridgain.grid.ggfs.mapreduce.records.IgniteFsByteDelimiterRecordResolver}</li> - * <li>{@link org.gridgain.grid.ggfs.mapreduce.records.IgniteFsStringDelimiterRecordResolver}</li> - * <li>{@link org.gridgain.grid.ggfs.mapreduce.records.IgniteFsNewLineRecordResolver}</li> - * </ul> - */ -public interface IgniteFsRecordResolver extends Serializable { - /** - * Adjusts record start offset and length. - * - * @param ggfs GGFS instance to use. - * @param stream Input stream for split file. - * @param suggestedRecord Suggested file system record. - * @return New adjusted record. If this method returns {@code null}, original record is ignored. - * @throws GridException If resolve failed. - * @throws IOException If resolve failed. - */ - @Nullable public IgniteFsFileRange resolveRecords(IgniteFs ggfs, IgniteFsInputStream stream, - IgniteFsFileRange suggestedRecord) throws GridException, IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java deleted file mode 100644 index bfde75c..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTask.java +++ /dev/null @@ -1,165 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.ggfs.mapreduce; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.fs.*; -import org.apache.ignite.resources.*; -import org.gridgain.grid.*; -import org.gridgain.grid.kernal.*; -import org.gridgain.grid.kernal.processors.ggfs.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * GGFS task which can be executed on the grid using one of {@code GridGgfs.execute()} methods. Essentially GGFS task - * is regular {@link org.apache.ignite.compute.ComputeTask} with different map logic. Instead of implementing - * {@link org.apache.ignite.compute.ComputeTask#map(List, Object)} method to split task into jobs, you must implement - * {@link IgniteFsTask#createJob(org.apache.ignite.fs.IgniteFsPath, IgniteFsFileRange, IgniteFsTaskArgs)} method. - * <p> - * Each file participating in GGFS task is split into {@link IgniteFsFileRange}s first. Normally range is a number of - * consequent bytes located on a single node (see {@code GridGgfsGroupDataBlocksKeyMapper}). In case maximum range size - * is provided (either through {@link org.apache.ignite.fs.IgniteFsConfiguration#getMaximumTaskRangeLength()} or {@code GridGgfs.execute()} - * argument), then ranges could be further divided into smaller chunks. - * <p> - * Once file is split into ranges, each range is passed to {@code GridGgfsTask.createJob()} method in order to create a - * {@link IgniteFsJob}. - * <p> - * Finally all generated jobs are sent to Grid nodes for execution. - * <p> - * As with regular {@code GridComputeTask} you can define your own logic for results handling and reduce step. - * <p> - * Here is an example of such a task: - * <pre name="code" class="java"> - * public class WordCountTask extends GridGgfsTask<String, Integer> { - * @Override - * public GridGgfsJob createJob(GridGgfsPath path, GridGgfsFileRange range, GridGgfsTaskArgs<T> args) throws GridException { - * // New job will be created for each range within each file. - * // We pass user-provided argument (which is essentially a word to look for) to that job. - * return new WordCountJob(args.userArgument()); - * } - * - * // Aggregate results into one compound result. - * public Integer reduce(List<GridComputeJobResult> results) throws GridException { - * Integer total = 0; - * - * for (GridComputeJobResult res : results) { - * Integer cnt = res.getData(); - * - * // Null can be returned for non-existent file in case we decide to ignore such situations. - * if (cnt != null) - * total += cnt; - * } - * - * return total; - * } - * } - * </pre> - */ -public abstract class IgniteFsTask<T, R> extends ComputeTaskAdapter<IgniteFsTaskArgs<T>, R> { - /** */ - private static final long serialVersionUID = 0L; - - /** Injected grid. */ - @IgniteInstanceResource - private Ignite ignite; - - /** {@inheritDoc} */ - @Nullable @Override public final Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, - @Nullable IgniteFsTaskArgs<T> args) throws GridException { - assert ignite != null; - assert args != null; - - IgniteFs ggfs = ignite.fileSystem(args.ggfsName()); - GridGgfsProcessorAdapter ggfsProc = ((GridKernal) ignite).context().ggfs(); - - Map<ComputeJob, ClusterNode> splitMap = new HashMap<>(); - - Map<UUID, ClusterNode> nodes = mapSubgrid(subgrid); - - for (IgniteFsPath path : args.paths()) { - IgniteFsFile file = ggfs.info(path); - - if (file == null) { - if (args.skipNonExistentFiles()) - continue; - else - throw new GridException("Failed to process GGFS file because it doesn't exist: " + path); - } - - Collection<IgniteFsBlockLocation> aff = ggfs.affinity(path, 0, file.length(), args.maxRangeLength()); - - long totalLen = 0; - - for (IgniteFsBlockLocation loc : aff) { - ClusterNode node = null; - - for (UUID nodeId : loc.nodeIds()) { - node = nodes.get(nodeId); - - if (node != null) - break; - } - - if (node == null) - throw new GridException("Failed to find any of block affinity nodes in subgrid [loc=" + loc + - ", subgrid=" + subgrid + ']'); - - IgniteFsJob job = createJob(path, new IgniteFsFileRange(file.path(), loc.start(), loc.length()), args); - - if (job != null) { - ComputeJob jobImpl = ggfsProc.createJob(job, ggfs.name(), file.path(), loc.start(), - loc.length(), args.recordResolver()); - - splitMap.put(jobImpl, node); - } - - totalLen += loc.length(); - } - - assert totalLen == file.length(); - } - - return splitMap; - } - - /** - * Callback invoked during task map procedure to create job that will process specified split - * for GGFS file. - * - * @param path Path. - * @param range File range based on consecutive blocks. This range will be further - * realigned to record boundaries on destination node. - * @param args Task argument. - * @return GGFS job. If {@code null} is returned, the passed in file range will be skipped. - * @throws GridException If job creation failed. - */ - @Nullable public abstract IgniteFsJob createJob(IgniteFsPath path, IgniteFsFileRange range, - IgniteFsTaskArgs<T> args) throws GridException; - - /** - * Maps list by node ID. - * - * @param subgrid Subgrid. - * @return Map. - */ - private Map<UUID, ClusterNode> mapSubgrid(Collection<ClusterNode> subgrid) { - Map<UUID, ClusterNode> res = U.newHashMap(subgrid.size()); - - for (ClusterNode node : subgrid) - res.put(node.id(), node); - - return res; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskArgs.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskArgs.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskArgs.java deleted file mode 100644 index d6622ef..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskArgs.java +++ /dev/null @@ -1,74 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.ggfs.mapreduce; - -import org.apache.ignite.fs.*; - -import java.util.*; - -/** - * GGFS task arguments. When you initiate new GGFS task execution using one of {@code GridGgfs.execute(...)} methods, - * all passed parameters are encapsulated in a single {@code GridGgfsTaskArgs} object. Later on this object is - * passed to {@link IgniteFsTask#createJob(org.apache.ignite.fs.IgniteFsPath, IgniteFsFileRange, IgniteFsTaskArgs)} method. - * <p> - * Task arguments encapsulates the following data: - * <ul> - * <li>GGFS name</li> - * <li>File paths passed to {@code GridGgfs.execute()} method</li> - * <li>{@link IgniteFsRecordResolver} for that task</li> - * <li>Flag indicating whether to skip non-existent file paths or throw an exception</li> - * <li>User-defined task argument</li> - * <li>Maximum file range length for that task (see {@link org.apache.ignite.fs.IgniteFsConfiguration#getMaximumTaskRangeLength()})</li> - * </ul> - */ -public interface IgniteFsTaskArgs<T> { - /** - * Gets GGFS name. - * - * @return GGFS name. - */ - public String ggfsName(); - - /** - * Gets file paths to process. - * - * @return File paths to process. - */ - public Collection<IgniteFsPath> paths(); - - /** - * Gets record resolver for the task. - * - * @return Record resolver. - */ - public IgniteFsRecordResolver recordResolver(); - - /** - * Flag indicating whether to fail or simply skip non-existent files. - * - * @return {@code True} if non-existent files should be skipped. - */ - public boolean skipNonExistentFiles(); - - /** - * User argument provided for task execution. - * - * @return User argument. - */ - public T userArgument(); - - /** - * Optional maximum allowed range length, {@code 0} by default. If not specified, full range including - * all consecutive blocks will be used without any limitations. - * - * @return Maximum range length. - */ - public long maxRangeLength(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskNoReduceAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskNoReduceAdapter.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskNoReduceAdapter.java deleted file mode 100644 index 180d7a4..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/IgniteFsTaskNoReduceAdapter.java +++ /dev/null @@ -1,34 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.ggfs.mapreduce; - -import org.apache.ignite.compute.*; - -import java.util.*; - -/** - * Convenient {@link IgniteFsTask} adapter with empty reduce step. Use this adapter in case you are not interested in - * results returned by jobs. - */ -public abstract class IgniteFsTaskNoReduceAdapter<T, R> extends IgniteFsTask<T, R> { - /** */ - private static final long serialVersionUID = 0L; - - /** - * Default implementation which will ignore all results sent from execution nodes. - * - * @param results Received results of broadcasted remote executions. Note that if task class has - * {@link org.apache.ignite.compute.ComputeTaskNoResultCache} annotation, then this list will be empty. - * @return Will always return {@code null}. - */ - @Override public R reduce(List<ComputeJobResult> results) { - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/package.html b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/package.html deleted file mode 100644 index cde37cf..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/package.html +++ /dev/null @@ -1,15 +0,0 @@ -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<!-- - @html.file.header - _________ _____ __________________ _____ - __ ____/___________(_)______ /__ ____/______ ____(_)_______ - _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ ---> -<html> -<body> - <!-- Package description. --> - Contains APIs for In-Memory MapReduce over GGFS. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java index 9ca7b54..480e574 100644 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java +++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsByteDelimiterRecordResolver.java @@ -11,9 +11,9 @@ package org.gridgain.grid.ggfs.mapreduce.records; import org.apache.ignite.*; import org.apache.ignite.fs.*; +import org.apache.ignite.fs.mapreduce.*; import org.apache.ignite.lang.*; import org.gridgain.grid.*; -import org.gridgain.grid.ggfs.mapreduce.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; import org.gridgain.grid.util.tostring.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java index ddfd402..b360483 100644 --- a/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java +++ b/modules/core/src/main/java/org/gridgain/grid/ggfs/mapreduce/records/IgniteFsFixedLengthRecordResolver.java @@ -11,8 +11,8 @@ package org.gridgain.grid.ggfs.mapreduce.records; import org.apache.ignite.*; import org.apache.ignite.fs.*; +import org.apache.ignite.fs.mapreduce.*; import org.gridgain.grid.*; -import org.gridgain.grid.ggfs.mapreduce.*; import org.gridgain.grid.util.typedef.internal.*; import java.io.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAsyncImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAsyncImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAsyncImpl.java index 0351392..24f0a5c 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAsyncImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAsyncImpl.java @@ -11,9 +11,9 @@ package org.gridgain.grid.kernal.processors.ggfs; import org.apache.ignite.*; import org.apache.ignite.fs.*; +import org.apache.ignite.fs.mapreduce.*; import org.apache.ignite.lang.*; import org.gridgain.grid.*; -import org.gridgain.grid.ggfs.mapreduce.*; import org.jetbrains.annotations.*; import java.net.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java index a54515c..2f00e69 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsImpl.java @@ -14,13 +14,13 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; import org.apache.ignite.events.*; import org.apache.ignite.fs.*; +import org.apache.ignite.fs.mapreduce.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.eviction.*; import org.gridgain.grid.cache.eviction.ggfs.*; -import org.gridgain.grid.ggfs.mapreduce.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.managers.communication.*; import org.gridgain.grid.kernal.managers.eventstorage.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsJobImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsJobImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsJobImpl.java index bdd8e66..93aff1a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsJobImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsJobImpl.java @@ -12,9 +12,9 @@ package org.gridgain.grid.kernal.processors.ggfs; import org.apache.ignite.*; import org.apache.ignite.compute.*; import org.apache.ignite.fs.*; +import org.apache.ignite.fs.mapreduce.*; import org.apache.ignite.resources.*; import org.gridgain.grid.*; -import org.gridgain.grid.ggfs.mapreduce.*; import org.gridgain.grid.kernal.*; import java.io.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java index 5ba2e40..335b05c 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java @@ -14,11 +14,11 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; import org.apache.ignite.configuration.*; import org.apache.ignite.fs.*; +import org.apache.ignite.fs.mapreduce.*; import org.apache.ignite.lang.*; import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.affinity.*; -import org.gridgain.grid.ggfs.mapreduce.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.license.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessorAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessorAdapter.java index a3bc423..ab4c8af 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessorAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessorAdapter.java @@ -12,7 +12,7 @@ package org.gridgain.grid.kernal.processors.ggfs; import org.apache.ignite.*; import org.apache.ignite.compute.*; import org.apache.ignite.fs.*; -import org.gridgain.grid.ggfs.mapreduce.*; +import org.apache.ignite.fs.mapreduce.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.*; import org.gridgain.grid.util.ipc.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsProcessor.java index d1f9d6e..1f2875b 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridNoopGgfsProcessor.java @@ -12,7 +12,7 @@ package org.gridgain.grid.kernal.processors.ggfs; import org.apache.ignite.*; import org.apache.ignite.compute.*; import org.apache.ignite.fs.*; -import org.gridgain.grid.ggfs.mapreduce.*; +import org.apache.ignite.fs.mapreduce.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.util.ipc.*; import org.gridgain.grid.util.typedef.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsTaskArgsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsTaskArgsImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsTaskArgsImpl.java index 78ecc7d..fe73fe4 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsTaskArgsImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsTaskArgsImpl.java @@ -10,7 +10,7 @@ package org.gridgain.grid.kernal.processors.ggfs; import org.apache.ignite.fs.*; -import org.gridgain.grid.ggfs.mapreduce.*; +import org.apache.ignite.fs.mapreduce.*; import org.gridgain.grid.util.typedef.internal.*; import java.io.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java index 02aa9ef..c6d6f24 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsTaskSelfTest.java @@ -13,11 +13,11 @@ import org.apache.ignite.*; import org.apache.ignite.compute.*; import org.apache.ignite.configuration.*; import org.apache.ignite.fs.*; +import org.apache.ignite.fs.mapreduce.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; import org.gridgain.grid.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.ggfs.mapreduce.*; import org.gridgain.grid.ggfs.mapreduce.records.*; import org.gridgain.grid.spi.discovery.tcp.*; import org.gridgain.grid.spi.discovery.tcp.ipfinder.*; @@ -35,7 +35,7 @@ import static org.gridgain.grid.cache.GridCacheWriteSynchronizationMode.*; import static org.apache.ignite.fs.IgniteFsMode.*; /** - * Tests for {@link org.gridgain.grid.ggfs.mapreduce.IgniteFsTask}. + * Tests for {@link org.apache.ignite.fs.mapreduce.IgniteFsTask}. */ public class GridGgfsTaskSelfTest extends GridGgfsCommonAbstractTest { /** Predefined words dictionary. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsAbstractRecordResolverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsAbstractRecordResolverSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsAbstractRecordResolverSelfTest.java index 7347f0f..905410c 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsAbstractRecordResolverSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsAbstractRecordResolverSelfTest.java @@ -12,8 +12,8 @@ package org.gridgain.grid.kernal.processors.ggfs.split; import org.apache.ignite.*; import org.apache.ignite.configuration.*; import org.apache.ignite.fs.*; +import org.apache.ignite.fs.mapreduce.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.ggfs.mapreduce.*; import org.gridgain.grid.spi.discovery.tcp.*; import org.gridgain.grid.spi.discovery.tcp.ipfinder.*; import org.gridgain.grid.spi.discovery.tcp.ipfinder.vm.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsByteDelimiterRecordResolverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsByteDelimiterRecordResolverSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsByteDelimiterRecordResolverSelfTest.java index b9f6b8d..bd9712a 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsByteDelimiterRecordResolverSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsByteDelimiterRecordResolverSelfTest.java @@ -10,7 +10,7 @@ package org.gridgain.grid.kernal.processors.ggfs.split; import org.apache.ignite.fs.*; -import org.gridgain.grid.ggfs.mapreduce.*; +import org.apache.ignite.fs.mapreduce.*; import org.gridgain.grid.ggfs.mapreduce.records.*; import org.gridgain.grid.util.typedef.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsFixedLengthRecordResolverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsFixedLengthRecordResolverSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsFixedLengthRecordResolverSelfTest.java index e9d3357..7bebc96 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsFixedLengthRecordResolverSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsFixedLengthRecordResolverSelfTest.java @@ -10,7 +10,7 @@ package org.gridgain.grid.kernal.processors.ggfs.split; import org.apache.ignite.fs.*; -import org.gridgain.grid.ggfs.mapreduce.*; +import org.apache.ignite.fs.mapreduce.*; import org.gridgain.grid.ggfs.mapreduce.records.*; import org.gridgain.grid.util.typedef.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsNewLineDelimiterRecordResolverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsNewLineDelimiterRecordResolverSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsNewLineDelimiterRecordResolverSelfTest.java index b1efc24..93d7286 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsNewLineDelimiterRecordResolverSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsNewLineDelimiterRecordResolverSelfTest.java @@ -10,7 +10,7 @@ package org.gridgain.grid.kernal.processors.ggfs.split; import org.apache.ignite.fs.*; -import org.gridgain.grid.ggfs.mapreduce.*; +import org.apache.ignite.fs.mapreduce.*; import org.gridgain.grid.ggfs.mapreduce.records.*; import org.gridgain.grid.util.typedef.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsStringDelimiterRecordResolverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsStringDelimiterRecordResolverSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsStringDelimiterRecordResolverSelfTest.java index bea2eff..efa044a 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsStringDelimiterRecordResolverSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/split/GridGgfsStringDelimiterRecordResolverSelfTest.java @@ -10,7 +10,7 @@ package org.gridgain.grid.kernal.processors.ggfs.split; import org.apache.ignite.fs.*; -import org.gridgain.grid.ggfs.mapreduce.*; +import org.apache.ignite.fs.mapreduce.*; import org.gridgain.grid.ggfs.mapreduce.records.*; import org.gridgain.grid.util.typedef.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bf07cfae/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java index 51424f4..e0b444d 100644 --- a/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java +++ b/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java @@ -12,10 +12,10 @@ package org.gridgain.grid.kernal.processors.hadoop; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.fs.*; +import org.apache.ignite.fs.mapreduce.*; import org.apache.ignite.lang.*; import org.gridgain.grid.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.ggfs.mapreduce.*; import org.gridgain.grid.hadoop.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.*;