http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java index 6345704..3f2b96e 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java @@ -34,7 +34,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; -import static org.apache.ignite.internal.processors.hadoop.GridHadoopTaskType.*; +import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.*; /** * Hadoop process base. @@ -69,7 +69,7 @@ public class HadoopChildProcessRunner { private final GridFutureAdapterEx<?> initFut = new GridFutureAdapterEx<>(); /** Job instance. */ - private GridHadoopJob job; + private HadoopJob job; /** Number of uncompleted tasks. */ private final AtomicInteger pendingTasks = new AtomicInteger(); @@ -179,12 +179,12 @@ public class HadoopChildProcessRunner { onTaskFinished0(this, status); } - @Override protected GridHadoopTaskInput createInput(GridHadoopTaskContext ctx) + @Override protected HadoopTaskInput createInput(HadoopTaskContext ctx) throws IgniteCheckedException { return shuffleJob.input(ctx); } - @Override protected GridHadoopTaskOutput createOutput(GridHadoopTaskContext ctx) + @Override protected HadoopTaskOutput createOutput(HadoopTaskContext ctx) throws IgniteCheckedException { return shuffleJob.output(ctx); }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java index 85f08be..cf550ab 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java @@ -42,7 +42,7 @@ public class HadoopV1CleanupTask extends HadoopV1Task { } /** {@inheritDoc} */ - @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; JobContext jobCtx = ctx.jobContext(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java index 51856d6..3501f56 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java @@ -42,8 +42,8 @@ public class HadoopV1MapTask extends HadoopV1Task { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - GridHadoopJob job = taskCtx.job(); + @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { + HadoopJob job = taskCtx.job(); HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; @@ -51,12 +51,12 @@ public class HadoopV1MapTask extends HadoopV1Task { InputFormat inFormat = jobConf.getInputFormat(); - GridHadoopInputSplit split = info().inputSplit(); + HadoopInputSplit split = info().inputSplit(); InputSplit nativeSplit; - if (split instanceof GridHadoopFileBlock) { - GridHadoopFileBlock block = (GridHadoopFileBlock)split; + if (split instanceof HadoopFileBlock) { + HadoopFileBlock block = (HadoopFileBlock)split; nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java index ac23bb3..348274d 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java @@ -32,7 +32,7 @@ public class HadoopV1OutputCollector implements OutputCollector { private final JobConf jobConf; /** Task context. */ - private final GridHadoopTaskContext taskCtx; + private final HadoopTaskContext taskCtx; /** Optional direct writer. */ private final RecordWriter writer; @@ -47,7 +47,7 @@ public class HadoopV1OutputCollector implements OutputCollector { * @param fileName File name. * @throws IOException In case of IO exception. */ - HadoopV1OutputCollector(JobConf jobConf, GridHadoopTaskContext taskCtx, boolean directWrite, + HadoopV1OutputCollector(JobConf jobConf, HadoopTaskContext taskCtx, boolean directWrite, @Nullable String fileName, TaskAttemptID attempt) throws IOException { this.jobConf = jobConf; this.taskCtx = taskCtx; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java index 36fdd55..e45f92b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java @@ -25,7 +25,7 @@ import org.apache.ignite.internal.processors.hadoop.*; /** * Hadoop partitioner adapter for v1 API. */ -public class HadoopV1Partitioner implements GridHadoopPartitioner { +public class HadoopV1Partitioner implements HadoopPartitioner { /** Partitioner instance. */ private Partitioner<Object, Object> part; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java index b5c6bfa..87e4620 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java @@ -44,14 +44,14 @@ public class HadoopV1ReduceTask extends HadoopV1Task { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - GridHadoopJob job = taskCtx.job(); + @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { + HadoopJob job = taskCtx.job(); HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; JobConf jobConf = ctx.jobConf(); - GridHadoopTaskInput input = taskCtx.input(); + HadoopTaskInput input = taskCtx.input(); HadoopV1OutputCollector collector = null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java index db4e159..d799373 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java @@ -26,14 +26,14 @@ import org.apache.ignite.internal.processors.hadoop.counter.*; */ public class HadoopV1Reporter implements Reporter { /** Context. */ - private final GridHadoopTaskContext ctx; + private final HadoopTaskContext ctx; /** * Creates new instance. * * @param ctx Context. */ - public HadoopV1Reporter(GridHadoopTaskContext ctx) { + public HadoopV1Reporter(HadoopTaskContext ctx) { this.ctx = ctx; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java index c427774..d1b4d79 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java @@ -38,7 +38,7 @@ public class HadoopV1SetupTask extends HadoopV1Task { } /** {@inheritDoc} */ - @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java index 0d89082..9eebbb8 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java @@ -39,7 +39,7 @@ public class HadoopV1Splitter { * @return Collection of mapped splits. * @throws IgniteCheckedException If mapping failed. */ - public static Collection<GridHadoopInputSplit> splitJob(JobConf jobConf) throws IgniteCheckedException { + public static Collection<HadoopInputSplit> splitJob(JobConf jobConf) throws IgniteCheckedException { try { InputFormat<?, ?> format = jobConf.getInputFormat(); @@ -47,7 +47,7 @@ public class HadoopV1Splitter { InputSplit[] splits = format.getSplits(jobConf, 0); - Collection<GridHadoopInputSplit> res = new ArrayList<>(splits.length); + Collection<HadoopInputSplit> res = new ArrayList<>(splits.length); for (int i = 0; i < splits.length; i++) { InputSplit nativeSplit = splits[i]; @@ -55,7 +55,7 @@ public class HadoopV1Splitter { if (nativeSplit instanceof FileSplit) { FileSplit s = (FileSplit)nativeSplit; - res.add(new GridHadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength())); + res.add(new HadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength())); } else res.add(HadoopUtils.wrapSplit(i, nativeSplit, nativeSplit.getLocations())); @@ -75,7 +75,7 @@ public class HadoopV1Splitter { * @return File block or {@code null} if it is not a {@link FileSplit} instance. * @throws IgniteCheckedException If failed. */ - @Nullable public static GridHadoopFileBlock readFileBlock(String clsName, FSDataInputStream in, + @Nullable public static HadoopFileBlock readFileBlock(String clsName, FSDataInputStream in, @Nullable String[] hosts) throws IgniteCheckedException { if (!FileSplit.class.getName().equals(clsName)) return null; @@ -92,6 +92,6 @@ public class HadoopV1Splitter { if (hosts == null) hosts = EMPTY_HOSTS; - return new GridHadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength()); + return new HadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength()); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java index 71a259c..633bf1d 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java @@ -28,7 +28,7 @@ import java.text.*; /** * Extended Hadoop v1 task. */ -public abstract class HadoopV1Task extends GridHadoopTask { +public abstract class HadoopV1Task extends HadoopTask { /** Indicates that this task is to be cancelled. */ private volatile boolean cancelled; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopExternalSplit.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopExternalSplit.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopExternalSplit.java index 8e968b2..496a710 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopExternalSplit.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopExternalSplit.java @@ -24,7 +24,7 @@ import java.io.*; /** * Split serialized in external file. */ -public class HadoopExternalSplit extends GridHadoopInputSplit { +public class HadoopExternalSplit extends HadoopInputSplit { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSerializationWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSerializationWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSerializationWrapper.java index 3e4a4c4..bb9cb68 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSerializationWrapper.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSerializationWrapper.java @@ -27,7 +27,7 @@ import java.io.*; /** * The wrapper around external serializer. */ -public class HadoopSerializationWrapper<T> implements GridHadoopSerialization { +public class HadoopSerializationWrapper<T> implements HadoopSerialization { /** External serializer - writer. */ private final Serializer<T> serializer; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSplitWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSplitWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSplitWrapper.java index c73a8b0..bc7ded3 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSplitWrapper.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSplitWrapper.java @@ -27,7 +27,7 @@ import java.io.*; * * Warning!! This class must not depend on any Hadoop classes directly or indirectly. */ -public class HadoopSplitWrapper extends GridHadoopInputSplit { +public class HadoopSplitWrapper extends HadoopInputSplit { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java index a4b5eca..9ee6b6f 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java @@ -39,10 +39,10 @@ public class HadoopV2Context extends JobContextImpl implements MapContext, Reduc private RecordWriter writer; /** Output is provided by executor environment. */ - private final GridHadoopTaskOutput output; + private final HadoopTaskOutput output; /** Input is provided by executor environment. */ - private final GridHadoopTaskInput input; + private final HadoopTaskInput input; /** Unique identifier for a task attempt. */ private final TaskAttemptID taskAttemptID; @@ -54,7 +54,7 @@ public class HadoopV2Context extends JobContextImpl implements MapContext, Reduc private InputSplit inputSplit; /** */ - private final GridHadoopTaskContext ctx; + private final HadoopTaskContext ctx; /** */ private String status; @@ -79,13 +79,13 @@ public class HadoopV2Context extends JobContextImpl implements MapContext, Reduc /** {@inheritDoc} */ @Override public InputSplit getInputSplit() { if (inputSplit == null) { - GridHadoopInputSplit split = ctx.taskInfo().inputSplit(); + HadoopInputSplit split = ctx.taskInfo().inputSplit(); if (split == null) return null; - if (split instanceof GridHadoopFileBlock) { - GridHadoopFileBlock fileBlock = (GridHadoopFileBlock)split; + if (split instanceof HadoopFileBlock) { + HadoopFileBlock fileBlock = (HadoopFileBlock)split; inputSplit = new FileSplit(new Path(fileBlock.file()), fileBlock.start(), fileBlock.length(), null); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java index 47535e8..902af88 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java @@ -44,7 +44,7 @@ import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; /** * Hadoop job implementation for v2 API. */ -public class HadoopV2Job implements GridHadoopJob { +public class HadoopV2Job implements HadoopJob { /** */ private final JobConf jobConf; @@ -64,7 +64,7 @@ public class HadoopV2Job implements GridHadoopJob { private final HadoopV2JobResourceManager rsrcMgr; /** */ - private final ConcurrentMap<T2<GridHadoopTaskType, Integer>, GridFutureAdapter<GridHadoopTaskContext>> ctxs = + private final ConcurrentMap<T2<HadoopTaskType, Integer>, GridFutureAdapter<HadoopTaskContext>> ctxs = new ConcurrentHashMap8<>(); /** Pooling task context class and thus class loading environment. */ @@ -120,7 +120,7 @@ public class HadoopV2Job implements GridHadoopJob { } /** {@inheritDoc} */ - @Override public Collection<GridHadoopInputSplit> input() throws IgniteCheckedException { + @Override public Collection<HadoopInputSplit> input() throws IgniteCheckedException { Thread.currentThread().setContextClassLoader(jobConf.getClassLoader()); try { @@ -146,7 +146,7 @@ public class HadoopV2Job implements GridHadoopJob { Path splitsFile = JobSubmissionFiles.getJobSplitFile(jobDir); try (FSDataInputStream in = fs.open(splitsFile)) { - Collection<GridHadoopInputSplit> res = new ArrayList<>(metaInfos.length); + Collection<HadoopInputSplit> res = new ArrayList<>(metaInfos.length); for (JobSplit.TaskSplitMetaInfo metaInfo : metaInfos) { long off = metaInfo.getStartOffset(); @@ -157,7 +157,7 @@ public class HadoopV2Job implements GridHadoopJob { String clsName = Text.readString(in); - GridHadoopFileBlock block = HadoopV1Splitter.readFileBlock(clsName, in, hosts); + HadoopFileBlock block = HadoopV1Splitter.readFileBlock(clsName, in, hosts); if (block == null) block = HadoopV2Splitter.readFileBlock(clsName, in, hosts); @@ -178,15 +178,15 @@ public class HadoopV2Job implements GridHadoopJob { } /** {@inheritDoc} */ - @Override public GridHadoopTaskContext getTaskContext(GridHadoopTaskInfo info) throws IgniteCheckedException { - T2<GridHadoopTaskType, Integer> locTaskId = new T2<>(info.type(), info.taskNumber()); + @Override public HadoopTaskContext getTaskContext(GridHadoopTaskInfo info) throws IgniteCheckedException { + T2<HadoopTaskType, Integer> locTaskId = new T2<>(info.type(), info.taskNumber()); - GridFutureAdapter<GridHadoopTaskContext> fut = ctxs.get(locTaskId); + GridFutureAdapter<HadoopTaskContext> fut = ctxs.get(locTaskId); if (fut != null) return fut.get(); - GridFutureAdapter<GridHadoopTaskContext> old = ctxs.putIfAbsent(locTaskId, fut = new GridFutureAdapter<>()); + GridFutureAdapter<HadoopTaskContext> old = ctxs.putIfAbsent(locTaskId, fut = new GridFutureAdapter<>()); if (old != null) return old.get(); @@ -201,7 +201,7 @@ public class HadoopV2Job implements GridHadoopJob { cls = ldr.loadClass(HadoopV2TaskContext.class.getName()); } - Constructor<?> ctr = cls.getConstructor(GridHadoopTaskInfo.class, GridHadoopJob.class, + Constructor<?> ctr = cls.getConstructor(GridHadoopTaskInfo.class, HadoopJob.class, GridHadoopJobId.class, UUID.class, DataInput.class); if (jobConfData == null) @@ -215,7 +215,7 @@ public class HadoopV2Job implements GridHadoopJob { } } - GridHadoopTaskContext res = (GridHadoopTaskContext)ctr.newInstance(info, this, jobId, locNodeId, + HadoopTaskContext res = (HadoopTaskContext)ctr.newInstance(info, this, jobId, locNodeId, new DataInputStream(new ByteArrayInputStream(jobConfData))); fut.onDone(res); @@ -262,7 +262,7 @@ public class HadoopV2Job implements GridHadoopJob { /** {@inheritDoc} */ @Override public void cleanupTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException { - GridHadoopTaskContext ctx = ctxs.remove(new T2<>(info.type(), info.taskNumber())).get(); + HadoopTaskContext ctx = ctxs.remove(new T2<>(info.type(), info.taskNumber())).get(); taskCtxClsPool.offer(ctx.getClass()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java index afa203f..43cbf5d 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java @@ -45,12 +45,12 @@ public class HadoopV2MapTask extends HadoopV2Task { /** {@inheritDoc} */ @SuppressWarnings({"ConstantConditions", "unchecked"}) @Override public void run0(HadoopV2TaskContext taskCtx) throws IgniteCheckedException { - GridHadoopInputSplit split = info().inputSplit(); + HadoopInputSplit split = info().inputSplit(); InputSplit nativeSplit; - if (split instanceof GridHadoopFileBlock) { - GridHadoopFileBlock block = (GridHadoopFileBlock)split; + if (split instanceof HadoopFileBlock) { + HadoopFileBlock block = (HadoopFileBlock)split; nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), null); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Partitioner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Partitioner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Partitioner.java index 83e713b..36382d4 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Partitioner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Partitioner.java @@ -25,7 +25,7 @@ import org.apache.ignite.internal.processors.hadoop.*; /** * Hadoop partitioner adapter for v2 API. */ -public class HadoopV2Partitioner implements GridHadoopPartitioner { +public class HadoopV2Partitioner implements HadoopPartitioner { /** Partitioner instance. */ private Partitioner<Object, Object> part; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Splitter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Splitter.java index d524994..76a3329 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Splitter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Splitter.java @@ -40,7 +40,7 @@ public class HadoopV2Splitter { * @return Collection of mapped splits. * @throws IgniteCheckedException If mapping failed. */ - public static Collection<GridHadoopInputSplit> splitJob(JobContext ctx) throws IgniteCheckedException { + public static Collection<HadoopInputSplit> splitJob(JobContext ctx) throws IgniteCheckedException { try { InputFormat<?, ?> format = ReflectionUtils.newInstance(ctx.getInputFormatClass(), ctx.getConfiguration()); @@ -48,7 +48,7 @@ public class HadoopV2Splitter { List<InputSplit> splits = format.getSplits(ctx); - Collection<GridHadoopInputSplit> res = new ArrayList<>(splits.size()); + Collection<HadoopInputSplit> res = new ArrayList<>(splits.size()); int id = 0; @@ -56,7 +56,7 @@ public class HadoopV2Splitter { if (nativeSplit instanceof FileSplit) { FileSplit s = (FileSplit)nativeSplit; - res.add(new GridHadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength())); + res.add(new HadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength())); } else res.add(HadoopUtils.wrapSplit(id, nativeSplit, nativeSplit.getLocations())); @@ -83,7 +83,7 @@ public class HadoopV2Splitter { * @return File block or {@code null} if it is not a {@link FileSplit} instance. * @throws IgniteCheckedException If failed. */ - public static GridHadoopFileBlock readFileBlock(String clsName, DataInput in, @Nullable String[] hosts) + public static HadoopFileBlock readFileBlock(String clsName, DataInput in, @Nullable String[] hosts) throws IgniteCheckedException { if (!FileSplit.class.getName().equals(clsName)) return null; @@ -100,6 +100,6 @@ public class HadoopV2Splitter { if (hosts == null) hosts = EMPTY_HOSTS; - return new GridHadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength()); + return new HadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength()); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java index 04c76ee..04d84a8 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java @@ -28,7 +28,7 @@ import java.io.*; /** * Extended Hadoop v2 task. */ -public abstract class HadoopV2Task extends GridHadoopTask { +public abstract class HadoopV2Task extends HadoopTask { /** Hadoop context. */ private HadoopV2Context hadoopCtx; @@ -42,7 +42,7 @@ public abstract class HadoopV2Task extends GridHadoopTask { } /** {@inheritDoc} */ - @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; hadoopCtx = new HadoopV2Context(ctx); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java index 65f6629..f0d41ae 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java @@ -45,7 +45,7 @@ import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; /** * Context for task execution. */ -public class HadoopV2TaskContext extends GridHadoopTaskContext { +public class HadoopV2TaskContext extends HadoopTaskContext { /** */ private static final boolean COMBINE_KEY_GROUPING_SUPPORTED; @@ -83,7 +83,7 @@ public class HadoopV2TaskContext extends GridHadoopTaskContext { private volatile boolean cancelled; /** Current task. */ - private volatile GridHadoopTask task; + private volatile HadoopTask task; /** Local node ID */ private UUID locNodeId; @@ -98,7 +98,7 @@ public class HadoopV2TaskContext extends GridHadoopTaskContext { * @param locNodeId Local node ID. * @param jobConfDataInput DataInput for read JobConf. */ - public HadoopV2TaskContext(GridHadoopTaskInfo taskInfo, GridHadoopJob job, GridHadoopJobId jobId, + public HadoopV2TaskContext(GridHadoopTaskInfo taskInfo, HadoopJob job, GridHadoopJobId jobId, @Nullable UUID locNodeId, DataInput jobConfDataInput) throws IgniteCheckedException { super(taskInfo, job); this.locNodeId = locNodeId; @@ -131,7 +131,7 @@ public class HadoopV2TaskContext extends GridHadoopTaskContext { } /** {@inheritDoc} */ - @Override public <T extends GridHadoopCounter> T counter(String grp, String name, Class<T> cls) { + @Override public <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls) { return cntrs.counter(grp, name, cls); } @@ -145,8 +145,8 @@ public class HadoopV2TaskContext extends GridHadoopTaskContext { * * @return Task. */ - private GridHadoopTask createTask() { - boolean isAbort = taskInfo().type() == GridHadoopTaskType.ABORT; + private HadoopTask createTask() { + boolean isAbort = taskInfo().type() == HadoopTaskType.ABORT; switch (taskInfo().type()) { case SETUP: @@ -206,7 +206,7 @@ public class HadoopV2TaskContext extends GridHadoopTaskContext { @Override public void cancel() { cancelled = true; - GridHadoopTask t = task; + HadoopTask t = task; if (t != null) t.cancel(); @@ -268,7 +268,7 @@ public class HadoopV2TaskContext extends GridHadoopTaskContext { * @param type Task type. * @return Hadoop task type. */ - private TaskType taskType(GridHadoopTaskType type) { + private TaskType taskType(HadoopTaskType type) { switch (type) { case SETUP: return TaskType.JOB_SETUP; @@ -307,7 +307,7 @@ public class HadoopV2TaskContext extends GridHadoopTaskContext { } /** {@inheritDoc} */ - @Override public GridHadoopPartitioner partitioner() throws IgniteCheckedException { + @Override public HadoopPartitioner partitioner() throws IgniteCheckedException { Class<?> partClsOld = jobConf().getClass("mapred.partitioner.class", null); if (partClsOld != null) @@ -329,7 +329,7 @@ public class HadoopV2TaskContext extends GridHadoopTaskContext { * @return Appropriate serializer. */ @SuppressWarnings("unchecked") - private GridHadoopSerialization getSerialization(Class<?> cls, Configuration jobConf) throws IgniteCheckedException { + private HadoopSerialization getSerialization(Class<?> cls, Configuration jobConf) throws IgniteCheckedException { A.notNull(cls, "cls"); SerializationFactory factory = new SerializationFactory(jobConf); @@ -346,12 +346,12 @@ public class HadoopV2TaskContext extends GridHadoopTaskContext { } /** {@inheritDoc} */ - @Override public GridHadoopSerialization keySerialization() throws IgniteCheckedException { + @Override public HadoopSerialization keySerialization() throws IgniteCheckedException { return getSerialization(jobCtx.getMapOutputKeyClass(), jobConf()); } /** {@inheritDoc} */ - @Override public GridHadoopSerialization valueSerialization() throws IgniteCheckedException { + @Override public HadoopSerialization valueSerialization() throws IgniteCheckedException { return getSerialization(jobCtx.getMapOutputValueClass(), jobConf()); } @@ -392,7 +392,7 @@ public class HadoopV2TaskContext extends GridHadoopTaskContext { * @throws IgniteCheckedException if failed. */ @SuppressWarnings("unchecked") - public Object getNativeSplit(GridHadoopInputSplit split) throws IgniteCheckedException { + public Object getNativeSplit(HadoopInputSplit split) throws IgniteCheckedException { if (split instanceof HadoopExternalSplit) return readExternalSplit((HadoopExternalSplit)split); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java index cf47e6f..3920dd5 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java @@ -28,7 +28,7 @@ import java.io.*; /** * Optimized serialization for Hadoop {@link Writable} types. */ -public class HadoopWritableSerialization implements GridHadoopSerialization { +public class HadoopWritableSerialization implements HadoopSerialization { /** */ private final Class<? extends Writable> cls; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java index f3f22fc..6eb53f9 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java @@ -41,7 +41,7 @@ import java.util.*; * Hadoop client protocol tests in external process mode. */ @SuppressWarnings("ResultOfMethodCallIgnored") -public class HadoopClientProtocolSelfTest extends GridHadoopAbstractSelfTest { +public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest { /** Input path. */ private static final String PATH_INPUT = "/input"; @@ -113,7 +113,7 @@ public class HadoopClientProtocolSelfTest extends GridHadoopAbstractSelfTest { /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { - grid(0).fileSystem(GridHadoopAbstractSelfTest.igfsName).format(); + grid(0).fileSystem(HadoopAbstractSelfTest.igfsName).format(); setupLockFile.delete(); mapLockFile.delete(); @@ -131,7 +131,7 @@ public class HadoopClientProtocolSelfTest extends GridHadoopAbstractSelfTest { private void tstNextJobId() throws Exception { IgniteHadoopClientProtocolProvider provider = provider(); - ClientProtocol proto = provider.create(config(GridHadoopAbstractSelfTest.REST_PORT)); + ClientProtocol proto = provider.create(config(HadoopAbstractSelfTest.REST_PORT)); JobID jobId = proto.getNewJobID(); @@ -152,7 +152,7 @@ public class HadoopClientProtocolSelfTest extends GridHadoopAbstractSelfTest { * @throws Exception If failed. */ public void testJobCounters() throws Exception { - IgniteFs igfs = grid(0).fileSystem(GridHadoopAbstractSelfTest.igfsName); + IgniteFs igfs = grid(0).fileSystem(HadoopAbstractSelfTest.igfsName); igfs.mkdirs(new IgfsPath(PATH_INPUT)); @@ -172,7 +172,7 @@ public class HadoopClientProtocolSelfTest extends GridHadoopAbstractSelfTest { ); } - Configuration conf = config(GridHadoopAbstractSelfTest.REST_PORT); + Configuration conf = config(HadoopAbstractSelfTest.REST_PORT); final Job job = Job.getInstance(conf); @@ -223,7 +223,7 @@ public class HadoopClientProtocolSelfTest extends GridHadoopAbstractSelfTest { private void tstUnknownJobCounters() throws Exception { IgniteHadoopClientProtocolProvider provider = provider(); - ClientProtocol proto = provider.create(config(GridHadoopAbstractSelfTest.REST_PORT)); + ClientProtocol proto = provider.create(config(HadoopAbstractSelfTest.REST_PORT)); try { proto.getJobCounters(new JobID(UUID.randomUUID().toString(), -1)); @@ -270,7 +270,7 @@ public class HadoopClientProtocolSelfTest extends GridHadoopAbstractSelfTest { * @throws Exception If failed. */ public void checkJobSubmit(boolean noCombiners, boolean noReducers) throws Exception { - IgniteFs igfs = grid(0).fileSystem(GridHadoopAbstractSelfTest.igfsName); + IgniteFs igfs = grid(0).fileSystem(HadoopAbstractSelfTest.igfsName); igfs.mkdirs(new IgfsPath(PATH_INPUT)); @@ -280,7 +280,7 @@ public class HadoopClientProtocolSelfTest extends GridHadoopAbstractSelfTest { bw.write("word"); } - Configuration conf = config(GridHadoopAbstractSelfTest.REST_PORT); + Configuration conf = config(HadoopAbstractSelfTest.REST_PORT); final Job job = Job.getInstance(conf); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractSelfTest.java deleted file mode 100644 index eeb6509..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractSelfTest.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.conf.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem; -import org.apache.ignite.internal.processors.hadoop.fs.*; -import org.apache.ignite.spi.communication.tcp.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.io.*; - -import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheMode.*; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; - -/** - * Abstract class for Hadoop tests. - */ -public abstract class GridHadoopAbstractSelfTest extends GridCommonAbstractTest { - /** */ - private static TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** REST port. */ - protected static final int REST_PORT = 11212; - - /** IGFS name. */ - protected static final String igfsName = null; - - /** IGFS name. */ - protected static final String igfsMetaCacheName = "meta"; - - /** IGFS name. */ - protected static final String igfsDataCacheName = "data"; - - /** IGFS block size. */ - protected static final int igfsBlockSize = 1024; - - /** IGFS block group size. */ - protected static final int igfsBlockGroupSize = 8; - - /** Initial REST port. */ - private int restPort = REST_PORT; - - /** Initial classpath. */ - private static String initCp; - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - // Add surefire classpath to regular classpath. - initCp = System.getProperty("java.class.path"); - - String surefireCp = System.getProperty("surefire.test.class.path"); - - if (surefireCp != null) - System.setProperty("java.class.path", initCp + File.pathSeparatorChar + surefireCp); - - super.beforeTestsStarted(); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - super.afterTestsStopped(); - - // Restore classpath. - System.setProperty("java.class.path", initCp); - - initCp = null; - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setHadoopConfiguration(hadoopConfiguration(gridName)); - - TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); - - commSpi.setSharedMemoryPort(-1); - - cfg.setCommunicationSpi(commSpi); - - TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); - - discoSpi.setIpFinder(IP_FINDER); - - if (igfsEnabled()) { - cfg.setCacheConfiguration(metaCacheConfiguration(), dataCacheConfiguration()); - - cfg.setIgfsConfiguration(igfsConfiguration()); - } - - if (restEnabled()) { - ConnectorConfiguration clnCfg = new ConnectorConfiguration(); - - clnCfg.setPort(restPort++); - - cfg.setConnectorConfiguration(clnCfg); - } - - cfg.setLocalHost("127.0.0.1"); - cfg.setPeerClassLoadingEnabled(false); - - return cfg; - } - - /** - * @param gridName Grid name. - * @return Hadoop configuration. - */ - public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = new GridHadoopConfiguration(); - - cfg.setMaxParallelTasks(3); - - return cfg; - } - - /** - * @return IGFS configuration. - */ - public IgfsConfiguration igfsConfiguration() { - IgfsConfiguration cfg = new IgfsConfiguration(); - - cfg.setName(igfsName); - cfg.setBlockSize(igfsBlockSize); - cfg.setDataCacheName(igfsDataCacheName); - cfg.setMetaCacheName(igfsMetaCacheName); - cfg.setFragmentizerEnabled(false); - - return cfg; - } - - /** - * @return IGFS meta cache configuration. - */ - public CacheConfiguration metaCacheConfiguration() { - CacheConfiguration cfg = new CacheConfiguration(); - - cfg.setName(igfsMetaCacheName); - cfg.setCacheMode(REPLICATED); - cfg.setAtomicityMode(TRANSACTIONAL); - cfg.setWriteSynchronizationMode(FULL_SYNC); - - return cfg; - } - - /** - * @return IGFS data cache configuration. - */ - private CacheConfiguration dataCacheConfiguration() { - CacheConfiguration cfg = new CacheConfiguration(); - - cfg.setName(igfsDataCacheName); - cfg.setCacheMode(PARTITIONED); - cfg.setAtomicityMode(TRANSACTIONAL); - cfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(igfsBlockGroupSize)); - cfg.setWriteSynchronizationMode(FULL_SYNC); - - return cfg; - } - - /** - * @return {@code True} if IGFS is enabled on Hadoop nodes. - */ - protected boolean igfsEnabled() { - return false; - } - - /** - * @return {@code True} if REST is enabled on Hadoop nodes. - */ - protected boolean restEnabled() { - return false; - } - - /** - * @return Number of nodes to start. - */ - protected int gridCount() { - return 3; - } - - /** - * @param cfg Config. - */ - protected void setupFileSystems(Configuration cfg) { - cfg.set("fs.defaultFS", igfsScheme()); - cfg.set("fs.igfs.impl", org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.class.getName()); - cfg.set("fs.AbstractFileSystem.igfs.impl", IgniteHadoopFileSystem. - class.getName()); - - HadoopFileSystemsUtils.setupFileSystems(cfg); - } - - /** - * @return IGFS scheme for test. - */ - protected String igfsScheme() { - return "igfs://:" + getTestGridName(0) + "@/"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractWordCountTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractWordCountTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractWordCountTest.java deleted file mode 100644 index ebbc0a6..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractWordCountTest.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import com.google.common.base.*; -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.processors.igfs.*; - -import java.io.*; -import java.util.*; - -/** - * Abstract class for tests based on WordCount test job. - */ -public abstract class GridHadoopAbstractWordCountTest extends GridHadoopAbstractSelfTest { - /** Input path. */ - protected static final String PATH_INPUT = "/input"; - - /** Output path. */ - protected static final String PATH_OUTPUT = "/output"; - - /** IGFS instance. */ - protected IgfsEx igfs; - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - Configuration cfg = new Configuration(); - - setupFileSystems(cfg); - - // Init cache by correct LocalFileSystem implementation - FileSystem.getLocal(cfg); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - igfs = (IgfsEx)startGrids(gridCount()).fileSystem(igfsName); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(true); - } - - /** {@inheritDoc} */ - @Override protected boolean igfsEnabled() { - return true; - } - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 1; - } - - /** - * Generates test file. - * - * @param path File name. - * @param wordCounts Words and counts. - * @throws Exception If failed. - */ - protected void generateTestFile(String path, Object... wordCounts) throws Exception { - List<String> wordsArr = new ArrayList<>(); - - //Generating - for (int i = 0; i < wordCounts.length; i += 2) { - String word = (String) wordCounts[i]; - int cnt = (Integer) wordCounts[i + 1]; - - while (cnt-- > 0) - wordsArr.add(word); - } - - //Shuffling - for (int i = 0; i < wordsArr.size(); i++) { - int j = (int)(Math.random() * wordsArr.size()); - - Collections.swap(wordsArr, i, j); - } - - //Input file preparing - PrintWriter testInputFileWriter = new PrintWriter(igfs.create(new IgfsPath(path), true)); - - int j = 0; - - while (j < wordsArr.size()) { - int i = 5 + (int)(Math.random() * 5); - - List<String> subList = wordsArr.subList(j, Math.min(j + i, wordsArr.size())); - j += i; - - testInputFileWriter.println(Joiner.on(' ').join(subList)); - } - - testInputFileWriter.close(); - } - - /** - * Reads whole text file into String. - * - * @param fileName Name of the file to read. - * @return Content of the file as String value. - * @throws Exception If could not read the file. - */ - protected String readAndSortFile(String fileName) throws Exception { - BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(new IgfsPath(fileName)))); - - List<String> list = new ArrayList<>(); - - String line; - - while ((line = reader.readLine()) != null) - list.add(line); - - Collections.sort(list); - - return Joiner.on('\n').join(list) + "\n"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCommandLineTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCommandLineTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCommandLineTest.java deleted file mode 100644 index 47dc727..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCommandLineTest.java +++ /dev/null @@ -1,440 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import com.google.common.base.*; -import org.apache.ignite.*; -import org.apache.ignite.hadoop.fs.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.processors.igfs.*; -import org.apache.ignite.internal.processors.hadoop.jobtracker.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jdk8.backport.*; - -import java.io.*; -import java.nio.file.*; -import java.util.*; - -/** - * Test of integration with Hadoop client via command line interface. - */ -public class GridHadoopCommandLineTest extends GridCommonAbstractTest { - /** IGFS instance. */ - private IgfsEx igfs; - - /** */ - private static final String igfsName = "igfs"; - - /** */ - private static File testWorkDir; - - /** */ - private static String hadoopHome; - - /** */ - private static String hiveHome; - - /** */ - private static File examplesJar; - - /** - * - * @param path File name. - * @param wordCounts Words and counts. - * @throws Exception If failed. - */ - private void generateTestFile(File path, Object... wordCounts) throws Exception { - List<String> wordsArr = new ArrayList<>(); - - //Generating - for (int i = 0; i < wordCounts.length; i += 2) { - String word = (String) wordCounts[i]; - int cnt = (Integer) wordCounts[i + 1]; - - while (cnt-- > 0) - wordsArr.add(word); - } - - //Shuffling - for (int i = 0; i < wordsArr.size(); i++) { - int j = (int)(Math.random() * wordsArr.size()); - - Collections.swap(wordsArr, i, j); - } - - //Writing file - try (PrintWriter writer = new PrintWriter(path)) { - int j = 0; - - while (j < wordsArr.size()) { - int i = 5 + (int)(Math.random() * 5); - - List<String> subList = wordsArr.subList(j, Math.min(j + i, wordsArr.size())); - j += i; - - writer.println(Joiner.on(' ').join(subList)); - } - - writer.flush(); - } - } - - /** - * Generates two data files to join its with Hive. - * - * @throws FileNotFoundException If failed. - */ - private void generateHiveTestFiles() throws FileNotFoundException { - try (PrintWriter writerA = new PrintWriter(new File(testWorkDir, "data-a")); - PrintWriter writerB = new PrintWriter(new File(testWorkDir, "data-b"))) { - char sep = '\t'; - - int idB = 0; - int idA = 0; - int v = 1000; - - for (int i = 0; i < 1000; i++) { - writerA.print(idA++); - writerA.print(sep); - writerA.println(idB); - - writerB.print(idB++); - writerB.print(sep); - writerB.println(v += 2); - - writerB.print(idB++); - writerB.print(sep); - writerB.println(v += 2); - } - - writerA.flush(); - writerB.flush(); - } - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - hiveHome = IgniteSystemProperties.getString("HIVE_HOME"); - - assertFalse("HIVE_HOME hasn't been set.", F.isEmpty(hiveHome)); - - hadoopHome = IgniteSystemProperties.getString("HADOOP_HOME"); - - assertFalse("HADOOP_HOME hasn't been set.", F.isEmpty(hadoopHome)); - - String mapredHome = hadoopHome + "/share/hadoop/mapreduce"; - - File[] fileList = new File(mapredHome).listFiles(new FileFilter() { - @Override public boolean accept(File pathname) { - return pathname.getName().startsWith("hadoop-mapreduce-examples-") && - pathname.getName().endsWith(".jar"); - } - }); - - assertEquals("Invalid hadoop distribution.", 1, fileList.length); - - examplesJar = fileList[0]; - - testWorkDir = Files.createTempDirectory("hadoop-cli-test").toFile(); - - U.copy(U.resolveIgnitePath("docs/core-site.ignite.xml"), new File(testWorkDir, "core-site.xml"), false); - - File srcFile = U.resolveIgnitePath("docs/mapred-site.ignite.xml"); - File dstFile = new File(testWorkDir, "mapred-site.xml"); - - try (BufferedReader in = new BufferedReader(new FileReader(srcFile)); - PrintWriter out = new PrintWriter(dstFile)) { - String line; - - while ((line = in.readLine()) != null) { - if (line.startsWith("</configuration>")) - out.println( - " <property>\n" + - " <name>" + HadoopUtils.JOB_COUNTER_WRITER_PROPERTY + "</name>\n" + - " <value>" + IgniteHadoopFileSystemCounterWriter.class.getName() + "</value>\n" + - " </property>\n"); - - out.println(line); - } - - out.flush(); - } - - generateTestFile(new File(testWorkDir, "test-data"), "red", 100, "green", 200, "blue", 150, "yellow", 50); - - generateHiveTestFiles(); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - super.afterTestsStopped(); - - U.delete(testWorkDir); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - igfs = (IgfsEx) Ignition.start("config/hadoop/default-config.xml").fileSystem(igfsName); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(true); - } - - /** - * Creates the process build with appropriate environment to run Hadoop CLI. - * - * @return Process builder. - */ - private ProcessBuilder createProcessBuilder() { - String sep = ":"; - - String ggClsPath = GridHadoopJob.class.getProtectionDomain().getCodeSource().getLocation().getPath() + sep + - HadoopJobTracker.class.getProtectionDomain().getCodeSource().getLocation().getPath() + sep + - ConcurrentHashMap8.class.getProtectionDomain().getCodeSource().getLocation().getPath(); - - ProcessBuilder res = new ProcessBuilder(); - - res.environment().put("HADOOP_HOME", hadoopHome); - res.environment().put("HADOOP_CLASSPATH", ggClsPath); - res.environment().put("HADOOP_CONF_DIR", testWorkDir.getAbsolutePath()); - - res.redirectErrorStream(true); - - return res; - } - - /** - * Waits for process exit and prints the its output. - * - * @param proc Process. - * @return Exit code. - * @throws Exception If failed. - */ - private int watchProcess(Process proc) throws Exception { - BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream())); - - String line; - - while ((line = reader.readLine()) != null) - log().info(line); - - return proc.waitFor(); - } - - /** - * Executes Hadoop command line tool. - * - * @param args Arguments for Hadoop command line tool. - * @return Process exit code. - * @throws Exception If failed. - */ - private int executeHadoopCmd(String... args) throws Exception { - ProcessBuilder procBuilder = createProcessBuilder(); - - List<String> cmd = new ArrayList<>(); - - cmd.add(hadoopHome + "/bin/hadoop"); - cmd.addAll(Arrays.asList(args)); - - procBuilder.command(cmd); - - log().info("Execute: " + procBuilder.command()); - - return watchProcess(procBuilder.start()); - } - - /** - * Executes Hive query. - * - * @param qry Query. - * @return Process exit code. - * @throws Exception If failed. - */ - private int executeHiveQuery(String qry) throws Exception { - ProcessBuilder procBuilder = createProcessBuilder(); - - List<String> cmd = new ArrayList<>(); - - procBuilder.command(cmd); - - cmd.add(hiveHome + "/bin/hive"); - - cmd.add("--hiveconf"); - cmd.add("hive.rpc.query.plan=true"); - - cmd.add("--hiveconf"); - cmd.add("javax.jdo.option.ConnectionURL=jdbc:derby:" + testWorkDir.getAbsolutePath() + "/metastore_db;" + - "databaseName=metastore_db;create=true"); - - cmd.add("-e"); - cmd.add(qry); - - procBuilder.command(cmd); - - log().info("Execute: " + procBuilder.command()); - - return watchProcess(procBuilder.start()); - } - - /** - * Tests Hadoop command line integration. - */ - public void testHadoopCommandLine() throws Exception { - assertEquals(0, executeHadoopCmd("fs", "-ls", "/")); - - assertEquals(0, executeHadoopCmd("fs", "-mkdir", "/input")); - - assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "test-data").getAbsolutePath(), "/input")); - - assertTrue(igfs.exists(new IgfsPath("/input/test-data"))); - - assertEquals(0, executeHadoopCmd("jar", examplesJar.getAbsolutePath(), "wordcount", "/input", "/output")); - - IgfsPath path = new IgfsPath("/user/" + System.getProperty("user.name") + "/"); - - assertTrue(igfs.exists(path)); - - IgfsPath jobStatPath = null; - - for (IgfsPath jobPath : igfs.listPaths(path)) { - assertNull(jobStatPath); - - jobStatPath = jobPath; - } - - File locStatFile = new File(testWorkDir, "performance"); - - assertEquals(0, executeHadoopCmd("fs", "-get", jobStatPath.toString() + "/performance", locStatFile.toString())); - - long evtCnt = GridHadoopTestUtils.simpleCheckJobStatFile(new BufferedReader(new FileReader(locStatFile))); - - assertTrue(evtCnt >= 22); //It's the minimum amount of events for job with combiner. - - assertTrue(igfs.exists(new IgfsPath("/output"))); - - BufferedReader in = new BufferedReader(new InputStreamReader(igfs.open(new IgfsPath("/output/part-r-00000")))); - - List<String> res = new ArrayList<>(); - - String line; - - while ((line = in.readLine()) != null) - res.add(line); - - Collections.sort(res); - - assertEquals("[blue\t150, green\t200, red\t100, yellow\t50]", res.toString()); - } - - /** - * Runs query check result. - * - * @param expRes Expected result. - * @param qry Query. - * @throws Exception If failed. - */ - private void checkQuery(String expRes, String qry) throws Exception { - assertEquals(0, executeHiveQuery("drop table if exists result")); - - assertEquals(0, executeHiveQuery( - "create table result " + - "row format delimited fields terminated by ' ' " + - "stored as textfile " + - "location '/result' as " + qry - )); - - IgfsInputStreamAdapter in = igfs.open(new IgfsPath("/result/000000_0")); - - byte[] buf = new byte[(int) in.length()]; - - in.read(buf); - - assertEquals(expRes, new String(buf)); - } - - /** - * Tests Hive integration. - */ - public void testHiveCommandLine() throws Exception { - assertEquals(0, executeHiveQuery( - "create table table_a (" + - "id_a int," + - "id_b int" + - ") " + - "row format delimited fields terminated by '\\t'" + - "stored as textfile " + - "location '/table-a'" - )); - - assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "data-a").getAbsolutePath(), "/table-a")); - - assertEquals(0, executeHiveQuery( - "create table table_b (" + - "id_b int," + - "rndv int" + - ") " + - "row format delimited fields terminated by '\\t'" + - "stored as textfile " + - "location '/table-b'" - )); - - assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "data-b").getAbsolutePath(), "/table-b")); - - checkQuery( - "0 0\n" + - "1 2\n" + - "2 4\n" + - "3 6\n" + - "4 8\n" + - "5 10\n" + - "6 12\n" + - "7 14\n" + - "8 16\n" + - "9 18\n", - "select * from table_a order by id_a limit 10" - ); - - checkQuery("2000\n", "select count(id_b) from table_b"); - - checkQuery( - "250 500 2002\n" + - "251 502 2006\n" + - "252 504 2010\n" + - "253 506 2014\n" + - "254 508 2018\n" + - "255 510 2022\n" + - "256 512 2026\n" + - "257 514 2030\n" + - "258 516 2034\n" + - "259 518 2038\n", - "select a.id_a, a.id_b, b.rndv" + - " from table_a a" + - " inner join table_b b on a.id_b = b.id_b" + - " where b.rndv > 2000" + - " order by a.id_a limit 10" - ); - - checkQuery("1000\n", "select count(b.id_b) from table_a a inner join table_b b on a.id_b = b.id_b"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopFileSystemsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopFileSystemsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopFileSystemsTest.java deleted file mode 100644 index 3ebc8ae..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopFileSystemsTest.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.ignite.internal.processors.hadoop.fs.*; -import org.apache.ignite.testframework.*; - -import java.io.*; -import java.net.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * Test file systems for the working directory multi-threading support. - */ -public class GridHadoopFileSystemsTest extends GridHadoopAbstractSelfTest { - private static final int THREAD_COUNT = 3; - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - startGrids(gridCount()); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(true); - } - - /** {@inheritDoc} */ - @Override protected boolean igfsEnabled() { - return true; - } - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 1; - } - - - /** - * Test the file system with specified URI for the multi-thread working directory support. - * - * @param uri Base URI of the file system (scheme and authority). - * @throws Exception If fails. - */ - private void testFileSystem(final URI uri) throws Exception { - final Configuration cfg = new Configuration(); - - setupFileSystems(cfg); - - cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, - new Path(new Path(uri), "user/" + System.getProperty("user.name")).toString()); - - final CountDownLatch changeUserPhase = new CountDownLatch(THREAD_COUNT); - final CountDownLatch changeDirPhase = new CountDownLatch(THREAD_COUNT); - final CountDownLatch changeAbsDirPhase = new CountDownLatch(THREAD_COUNT); - final CountDownLatch finishPhase = new CountDownLatch(THREAD_COUNT); - - final Path[] newUserInitWorkDir = new Path[THREAD_COUNT]; - final Path[] newWorkDir = new Path[THREAD_COUNT]; - final Path[] newAbsWorkDir = new Path[THREAD_COUNT]; - final Path[] newInstanceWorkDir = new Path[THREAD_COUNT]; - - final AtomicInteger threadNum = new AtomicInteger(0); - - GridTestUtils.runMultiThreadedAsync(new Runnable() { - @Override public void run() { - try { - int curThreadNum = threadNum.getAndIncrement(); - - FileSystem fs = FileSystem.get(uri, cfg); - - HadoopFileSystemsUtils.setUser(fs, "user" + curThreadNum); - - if ("file".equals(uri.getScheme())) - FileSystem.get(uri, cfg).setWorkingDirectory(new Path("file:///user/user" + curThreadNum)); - - changeUserPhase.countDown(); - changeUserPhase.await(); - - newUserInitWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory(); - - FileSystem.get(uri, cfg).setWorkingDirectory(new Path("folder" + curThreadNum)); - - changeDirPhase.countDown(); - changeDirPhase.await(); - - newWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory(); - - FileSystem.get(uri, cfg).setWorkingDirectory(new Path("/folder" + curThreadNum)); - - changeAbsDirPhase.countDown(); - changeAbsDirPhase.await(); - - newAbsWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory(); - - newInstanceWorkDir[curThreadNum] = FileSystem.newInstance(uri, cfg).getWorkingDirectory(); - - finishPhase.countDown(); - } - catch (InterruptedException | IOException e) { - error("Failed to execute test thread.", e); - - fail(); - } - } - }, THREAD_COUNT, "filesystems-test"); - - finishPhase.await(); - - for (int i = 0; i < THREAD_COUNT; i ++) { - cfg.set(MRJobConfig.USER_NAME, "user" + i); - - Path workDir = new Path(new Path(uri), "user/user" + i); - - cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, workDir.toString()); - - assertEquals(workDir, FileSystem.newInstance(uri, cfg).getWorkingDirectory()); - - assertEquals(workDir, newUserInitWorkDir[i]); - - assertEquals(new Path(new Path(uri), "user/user" + i + "/folder" + i), newWorkDir[i]); - - assertEquals(new Path("/folder" + i), newAbsWorkDir[i]); - - assertEquals(new Path(new Path(uri), "user/" + System.getProperty("user.name")), newInstanceWorkDir[i]); - } - - System.out.println(System.getProperty("user.dir")); - } - - /** - * Test IGFS multi-thread working directory. - * - * @throws Exception If fails. - */ - public void testIgfs() throws Exception { - testFileSystem(URI.create(igfsScheme())); - } - - /** - * Test HDFS multi-thread working directory. - * - * @throws Exception If fails. - */ - public void testHdfs() throws Exception { - testFileSystem(URI.create("hdfs://localhost/")); - } - - /** - * Test LocalFS multi-thread working directory. - * - * @throws Exception If fails. - */ - public void testLocal() throws Exception { - testFileSystem(URI.create("file:///")); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopGroupingTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopGroupingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopGroupingTest.java deleted file mode 100644 index c9c577d..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopGroupingTest.java +++ /dev/null @@ -1,286 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; - -/** - * Grouping test. - */ -public class GridHadoopGroupingTest extends GridHadoopAbstractSelfTest { - /** */ - private static final String PATH_OUTPUT = "/test-out"; - - /** */ - private static final GridConcurrentHashSet<UUID> vals = GridHadoopSharedMap.map(GridHadoopGroupingTest.class) - .put("vals", new GridConcurrentHashSet<UUID>()); - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 3; - } - - /** {@inheritDoc} */ - protected boolean igfsEnabled() { - return false; - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - startGrids(gridCount()); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(true); - } - - /** {@inheritDoc} */ - @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); - - cfg.setExternalExecution(false); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testGroupingReducer() throws Exception { - doTestGrouping(false); - } - - /** - * @throws Exception If failed. - */ - public void testGroupingCombiner() throws Exception { - doTestGrouping(true); - } - - /** - * @param combiner With combiner. - * @throws Exception If failed. - */ - public void doTestGrouping(boolean combiner) throws Exception { - vals.clear(); - - Job job = Job.getInstance(); - - job.setInputFormatClass(InFormat.class); - job.setOutputFormatClass(OutFormat.class); - - job.setOutputKeyClass(YearTemperature.class); - job.setOutputValueClass(Text.class); - - job.setMapperClass(Mapper.class); - - if (combiner) { - job.setCombinerClass(MyReducer.class); - job.setNumReduceTasks(0); - job.setCombinerKeyGroupingComparatorClass(YearComparator.class); - } - else { - job.setReducerClass(MyReducer.class); - job.setNumReduceTasks(4); - job.setGroupingComparatorClass(YearComparator.class); - } - - grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 2), - createJobInfo(job.getConfiguration())).get(30000); - - assertTrue(vals.isEmpty()); - } - - public static class MyReducer extends Reducer<YearTemperature, Text, Text, Object> { - /** */ - int lastYear; - - @Override protected void reduce(YearTemperature key, Iterable<Text> vals0, Context context) - throws IOException, InterruptedException { - X.println("___ : " + context.getTaskAttemptID() + " --> " + key); - - Set<UUID> ids = new HashSet<>(); - - for (Text val : vals0) - assertTrue(ids.add(UUID.fromString(val.toString()))); - - for (Text val : vals0) - assertTrue(ids.remove(UUID.fromString(val.toString()))); - - assertTrue(ids.isEmpty()); - - assertTrue(key.year > lastYear); - - lastYear = key.year; - - for (Text val : vals0) - assertTrue(vals.remove(UUID.fromString(val.toString()))); - } - } - - public static class YearComparator implements RawComparator<YearTemperature> { // Grouping comparator. - /** {@inheritDoc} */ - @Override public int compare(YearTemperature o1, YearTemperature o2) { - return Integer.compare(o1.year, o2.year); - } - - /** {@inheritDoc} */ - @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { - throw new IllegalStateException(); - } - } - - public static class YearTemperature implements WritableComparable<YearTemperature>, Cloneable { - /** */ - private int year; - - /** */ - private int temperature; - - /** {@inheritDoc} */ - @Override public void write(DataOutput out) throws IOException { - out.writeInt(year); - out.writeInt(temperature); - } - - /** {@inheritDoc} */ - @Override public void readFields(DataInput in) throws IOException { - year = in.readInt(); - temperature = in.readInt(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - throw new IllegalStateException(); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { // To be partitioned by year. - return year; - } - - /** {@inheritDoc} */ - @Override public int compareTo(YearTemperature o) { - int res = Integer.compare(year, o.year); - - if (res != 0) - return res; - - // Sort comparator by year and temperature, to find max for year. - return Integer.compare(o.temperature, temperature); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(YearTemperature.class, this); - } - } - - public static class InFormat extends InputFormat<YearTemperature, Text> { - /** {@inheritDoc} */ - @Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { - ArrayList<InputSplit> list = new ArrayList<>(); - - for (int i = 0; i < 10; i++) - list.add(new GridHadoopSortingTest.FakeSplit(20)); - - return list; - } - - /** {@inheritDoc} */ - @Override public RecordReader<YearTemperature, Text> createRecordReader(final InputSplit split, - TaskAttemptContext context) throws IOException, InterruptedException { - return new RecordReader<YearTemperature, Text>() { - /** */ - int cnt; - - /** */ - Random rnd = new GridRandom(); - - /** */ - YearTemperature key = new YearTemperature(); - - /** */ - Text val = new Text(); - - @Override public void initialize(InputSplit split, TaskAttemptContext context) { - // No-op. - } - - @Override public boolean nextKeyValue() throws IOException, InterruptedException { - return cnt++ < split.getLength(); - } - - @Override public YearTemperature getCurrentKey() { - key.year = 1990 + rnd.nextInt(10); - key.temperature = 10 + rnd.nextInt(20); - - return key; - } - - @Override public Text getCurrentValue() { - UUID id = UUID.randomUUID(); - - assertTrue(vals.add(id)); - - val.set(id.toString()); - - return val; - } - - @Override public float getProgress() { - return 0; - } - - @Override public void close() { - // No-op. - } - }; - } - } - - /** - * - */ - public static class OutFormat extends OutputFormat { - /** {@inheritDoc} */ - @Override public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { - return null; - } - - /** {@inheritDoc} */ - @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { - return null; - } - } -}