http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java index db95b2f..04a96de 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java @@ -70,7 +70,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { private final ConcurrentMap<UUID, HadoopProcess> runningProcsByProcId = new ConcurrentHashMap8<>(); /** Starting processes. */ - private final ConcurrentMap<GridHadoopJobId, HadoopProcess> runningProcsByJobId = new ConcurrentHashMap8<>(); + private final ConcurrentMap<HadoopJobId, HadoopProcess> runningProcsByJobId = new ConcurrentHashMap8<>(); /** Busy lock. */ private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock(); @@ -135,7 +135,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { if (log.isDebugEnabled()) log.debug("Updating job information for remote task process [proc=" + proc + ", meta=" + meta + ']'); - if (meta.phase() == GridHadoopJobPhase.PHASE_COMPLETE) { + if (meta.phase() == HadoopJobPhase.PHASE_COMPLETE) { if (log.isDebugEnabled()) log.debug("Completed job execution, will terminate child process [jobId=" + meta.jobId() + ", proc=" + proc + ']'); @@ -191,7 +191,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { /** {@inheritDoc} */ @SuppressWarnings("ConstantConditions") - @Override public void run(final HadoopJob job, final Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException { + @Override public void run(final HadoopJob job, final Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException { if (!busyLock.tryReadLock()) { if (log.isDebugEnabled()) log.debug("Failed to start hadoop tasks (grid is stopping, will ignore)."); @@ -255,7 +255,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { } /** {@inheritDoc} */ - @Override public void cancelTasks(GridHadoopJobId jobId) { + @Override public void cancelTasks(HadoopJobId jobId) { HadoopProcess proc = runningProcsByJobId.get(jobId); if (proc != null) @@ -269,7 +269,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { * @param job Job instance. * @param tasks Collection of tasks to execute in started process. */ - private void sendExecutionRequest(HadoopProcess proc, HadoopJob job, Collection<GridHadoopTaskInfo> tasks) + private void sendExecutionRequest(HadoopProcess proc, HadoopJob job, Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException { // Must synchronize since concurrent process crash may happen and will receive onConnectionLost(). proc.lock(); @@ -312,10 +312,10 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { * @param state Fail state. * @param e Optional error. */ - private void notifyTasksFailed(Iterable<GridHadoopTaskInfo> tasks, HadoopTaskState state, Throwable e) { + private void notifyTasksFailed(Iterable<HadoopTaskInfo> tasks, HadoopTaskState state, Throwable e) { HadoopTaskStatus fail = new HadoopTaskStatus(state, e); - for (GridHadoopTaskInfo task : tasks) + for (HadoopTaskInfo task : tasks) jobTracker.onTaskFinished(task, fail); } @@ -325,12 +325,12 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { * @param job Job instance. * @param plan Map reduce plan. */ - private HadoopProcess startProcess(final HadoopJob job, final GridHadoopMapReducePlan plan) { + private HadoopProcess startProcess(final HadoopJob job, final HadoopMapReducePlan plan) { final UUID childProcId = UUID.randomUUID(); - GridHadoopJobId jobId = job.id(); + HadoopJobId jobId = job.id(); - final GridHadoopProcessFuture fut = new GridHadoopProcessFuture(childProcId, jobId, ctx.kernalContext()); + final HadoopProcessFuture fut = new HadoopProcessFuture(childProcId, jobId, ctx.kernalContext()); final HadoopProcess proc = new HadoopProcess(jobId, fut, plan.reducers(ctx.localNodeId())); @@ -538,7 +538,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { * @param jobId Job ID. * @return Job work folder. */ - private String jobWorkFolder(GridHadoopJobId jobId) { + private String jobWorkFolder(HadoopJobId jobId) { return outputBase + File.separator + "Job_" + jobId; } @@ -604,7 +604,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { * @param job Job. * @param plan Map reduce plan. */ - private void prepareForJob(HadoopProcess proc, HadoopJob job, GridHadoopMapReducePlan plan) { + private void prepareForJob(HadoopProcess proc, HadoopJob job, HadoopMapReducePlan plan) { try { comm.sendMessage(proc.descriptor(), new HadoopPrepareForJobRequest(job.id(), job.info(), plan.reducers(), plan.reducers(ctx.localNodeId()))); @@ -647,7 +647,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { assert proc != null : "Missing child process for processId: " + desc; - GridHadoopProcessFuture fut = proc.initFut; + HadoopProcessFuture fut = proc.initFut; if (fut != null) fut.onReplyReceived(desc); @@ -684,7 +684,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { HadoopProcess proc = runningProcsByProcId.get(desc.processId()); if (proc != null) { - Collection<GridHadoopTaskInfo> tasks = proc.tasks(); + Collection<HadoopTaskInfo> tasks = proc.tasks(); if (!F.isEmpty(tasks)) { log.warning("Lost connection with alive process (will terminate): " + desc); @@ -692,7 +692,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { HadoopTaskStatus status = new HadoopTaskStatus(CRASHED, new IgniteCheckedException("Failed to run tasks (external process finished unexpectedly): " + desc)); - for (GridHadoopTaskInfo info : tasks) + for (HadoopTaskInfo info : tasks) jobTracker.onTaskFinished(info, status); runningProcsByJobId.remove(proc.jobId(), proc); @@ -716,13 +716,13 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { private static final long serialVersionUID = 0L; /** Job ID. */ - private final GridHadoopJobId jobId; + private final HadoopJobId jobId; /** Process. */ private Process proc; /** Init future. Completes when process is ready to receive messages. */ - private final GridHadoopProcessFuture initFut; + private final HadoopProcessFuture initFut; /** Process descriptor. */ private HadoopProcessDescriptor procDesc; @@ -731,7 +731,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { private Collection<Integer> reducers; /** Tasks. */ - private final Collection<GridHadoopTaskInfo> tasks = new ConcurrentLinkedDeque8<>(); + private final Collection<HadoopTaskInfo> tasks = new ConcurrentLinkedDeque8<>(); /** Terminated flag. */ private volatile boolean terminated; @@ -740,7 +740,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { * @param jobId Job ID. * @param initFut Init future. */ - private HadoopProcess(GridHadoopJobId jobId, GridHadoopProcessFuture initFut, + private HadoopProcess(HadoopJobId jobId, HadoopProcessFuture initFut, int[] reducers) { this.jobId = jobId; this.initFut = initFut; @@ -763,7 +763,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { /** * @return Job ID. */ - public GridHadoopJobId jobId() { + public HadoopJobId jobId() { return jobId; } @@ -815,7 +815,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { * * @param tasks Tasks to set. */ - private void addTasks(Collection<GridHadoopTaskInfo> tasks) { + private void addTasks(Collection<HadoopTaskInfo> tasks) { this.tasks.addAll(tasks); } @@ -824,7 +824,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { * * @param task Task to remove. */ - private void removeTask(GridHadoopTaskInfo task) { + private void removeTask(HadoopTaskInfo task) { if (tasks != null) tasks.remove(task); } @@ -832,7 +832,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { /** * @return Collection of tasks. */ - private Collection<GridHadoopTaskInfo> tasks() { + private Collection<HadoopTaskInfo> tasks() { return tasks; } @@ -852,7 +852,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { /** * */ - private class GridHadoopProcessFuture extends GridFutureAdapter<IgniteBiTuple<Process, HadoopProcessDescriptor>> { + private class HadoopProcessFuture extends GridFutureAdapter<IgniteBiTuple<Process, HadoopProcessDescriptor>> { /** */ private static final long serialVersionUID = 0L; @@ -860,7 +860,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { private UUID childProcId; /** Job ID. */ - private GridHadoopJobId jobId; + private HadoopJobId jobId; /** Process descriptor. */ private HadoopProcessDescriptor desc; @@ -880,14 +880,14 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { /** * Empty constructor. */ - public GridHadoopProcessFuture() { + public HadoopProcessFuture() { // No-op. } /** * @param ctx Kernal context. */ - private GridHadoopProcessFuture(UUID childProcId, GridHadoopJobId jobId, GridKernalContext ctx) { + private HadoopProcessFuture(UUID childProcId, HadoopJobId jobId, GridKernalContext ctx) { super(ctx); this.childProcId = childProcId;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java index 1258819..25c9408 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java @@ -33,11 +33,11 @@ public class HadoopJobInfoUpdateRequest implements HadoopMessage { /** Job ID. */ @GridToStringInclude - private GridHadoopJobId jobId; + private HadoopJobId jobId; /** Job phase. */ @GridToStringInclude - private GridHadoopJobPhase jobPhase; + private HadoopJobPhase jobPhase; /** Reducers addresses. */ @GridToStringInclude @@ -55,7 +55,7 @@ public class HadoopJobInfoUpdateRequest implements HadoopMessage { * @param jobPhase Job phase. * @param reducersAddrs Reducers addresses. */ - public HadoopJobInfoUpdateRequest(GridHadoopJobId jobId, GridHadoopJobPhase jobPhase, + public HadoopJobInfoUpdateRequest(HadoopJobId jobId, HadoopJobPhase jobPhase, HadoopProcessDescriptor[] reducersAddrs) { assert jobId != null; @@ -67,14 +67,14 @@ public class HadoopJobInfoUpdateRequest implements HadoopMessage { /** * @return Job ID. */ - public GridHadoopJobId jobId() { + public HadoopJobId jobId() { return jobId; } /** * @return Job phase. */ - public GridHadoopJobPhase jobPhase() { + public HadoopJobPhase jobPhase() { return jobPhase; } @@ -95,10 +95,10 @@ public class HadoopJobInfoUpdateRequest implements HadoopMessage { /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - jobId = new GridHadoopJobId(); + jobId = new HadoopJobId(); jobId.readExternal(in); - jobPhase = (GridHadoopJobPhase)in.readObject(); + jobPhase = (HadoopJobPhase)in.readObject(); reducersAddrs = (HadoopProcessDescriptor[])U.readArray(in); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java index 4037b26..df44dd7 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java @@ -33,11 +33,11 @@ public class HadoopPrepareForJobRequest implements HadoopMessage { /** Job ID. */ @GridToStringInclude - private GridHadoopJobId jobId; + private HadoopJobId jobId; /** Job info. */ @GridToStringInclude - private GridHadoopJobInfo jobInfo; + private HadoopJobInfo jobInfo; /** Total amount of reducers in the job. */ @GridToStringInclude @@ -60,7 +60,7 @@ public class HadoopPrepareForJobRequest implements HadoopMessage { * @param totalReducersCnt Number of reducers in the job. * @param locReducers Reducers to be executed on current node. */ - public HadoopPrepareForJobRequest(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo, int totalReducersCnt, + public HadoopPrepareForJobRequest(HadoopJobId jobId, HadoopJobInfo jobInfo, int totalReducersCnt, int[] locReducers) { assert jobId != null; @@ -73,14 +73,14 @@ public class HadoopPrepareForJobRequest implements HadoopMessage { /** * @return Job info. */ - public GridHadoopJobInfo jobInfo() { + public HadoopJobInfo jobInfo() { return jobInfo; } /** * @return Job ID. */ - public GridHadoopJobId jobId() { + public HadoopJobId jobId() { return jobId; } @@ -110,10 +110,10 @@ public class HadoopPrepareForJobRequest implements HadoopMessage { /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - jobId = new GridHadoopJobId(); + jobId = new HadoopJobId(); jobId.readExternal(in); - jobInfo = (GridHadoopJobInfo)in.readObject(); + jobInfo = (HadoopJobInfo)in.readObject(); totalReducersCnt = in.readInt(); locReducers = U.readIntArray(in); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java index edf1840..05e12ef 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java @@ -34,55 +34,55 @@ public class HadoopTaskExecutionRequest implements HadoopMessage { /** Job ID. */ @GridToStringInclude - private GridHadoopJobId jobId; + private HadoopJobId jobId; /** Job info. */ @GridToStringInclude - private GridHadoopJobInfo jobInfo; + private HadoopJobInfo jobInfo; /** Mappers. */ @GridToStringInclude - private Collection<GridHadoopTaskInfo> tasks; + private Collection<HadoopTaskInfo> tasks; /** * @return Job ID. */ - public GridHadoopJobId jobId() { + public HadoopJobId jobId() { return jobId; } /** * @param jobId Job ID. */ - public void jobId(GridHadoopJobId jobId) { + public void jobId(HadoopJobId jobId) { this.jobId = jobId; } /** * @return Jon info. */ - public GridHadoopJobInfo jobInfo() { + public HadoopJobInfo jobInfo() { return jobInfo; } /** * @param jobInfo Job info. */ - public void jobInfo(GridHadoopJobInfo jobInfo) { + public void jobInfo(HadoopJobInfo jobInfo) { this.jobInfo = jobInfo; } /** * @return Tasks. */ - public Collection<GridHadoopTaskInfo> tasks() { + public Collection<HadoopTaskInfo> tasks() { return tasks; } /** * @param tasks Tasks. */ - public void tasks(Collection<GridHadoopTaskInfo> tasks) { + public void tasks(Collection<HadoopTaskInfo> tasks) { this.tasks = tasks; } @@ -101,10 +101,10 @@ public class HadoopTaskExecutionRequest implements HadoopMessage { /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - jobId = new GridHadoopJobId(); + jobId = new HadoopJobId(); jobId.readExternal(in); - jobInfo = (GridHadoopJobInfo)in.readObject(); + jobInfo = (HadoopJobInfo)in.readObject(); tasks = U.readCollection(in); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java index a516f6b..d3639c7 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java @@ -32,7 +32,7 @@ public class HadoopTaskFinishedMessage implements HadoopMessage { private static final long serialVersionUID = 0L; /** Finished task info. */ - private GridHadoopTaskInfo taskInfo; + private HadoopTaskInfo taskInfo; /** Task finish status. */ private HadoopTaskStatus status; @@ -48,7 +48,7 @@ public class HadoopTaskFinishedMessage implements HadoopMessage { * @param taskInfo Finished task info. * @param status Task finish status. */ - public HadoopTaskFinishedMessage(GridHadoopTaskInfo taskInfo, HadoopTaskStatus status) { + public HadoopTaskFinishedMessage(HadoopTaskInfo taskInfo, HadoopTaskStatus status) { assert taskInfo != null; assert status != null; @@ -59,7 +59,7 @@ public class HadoopTaskFinishedMessage implements HadoopMessage { /** * @return Finished task info. */ - public GridHadoopTaskInfo taskInfo() { + public HadoopTaskInfo taskInfo() { return taskInfo; } @@ -83,7 +83,7 @@ public class HadoopTaskFinishedMessage implements HadoopMessage { /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - taskInfo = new GridHadoopTaskInfo(); + taskInfo = new HadoopTaskInfo(); taskInfo.readExternal(in); status = new HadoopTaskStatus(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java index 3f2b96e..e95b8cb 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java @@ -157,7 +157,7 @@ public class HadoopChildProcessRunner { assert set; - GridHadoopTaskInfo info = F.first(req.tasks()); + HadoopTaskInfo info = F.first(req.tasks()); assert info != null; @@ -170,7 +170,7 @@ public class HadoopChildProcessRunner { log.debug("Set executor service size for task type [type=" + info.type() + ", size=" + size + ']'); - for (GridHadoopTaskInfo taskInfo : req.tasks()) { + for (HadoopTaskInfo taskInfo : req.tasks()) { if (log.isDebugEnabled()) log.debug("Submitted task for external execution: " + taskInfo); @@ -192,7 +192,7 @@ public class HadoopChildProcessRunner { } } catch (IgniteCheckedException e) { - for (GridHadoopTaskInfo info : req.tasks()) + for (HadoopTaskInfo info : req.tasks()) notifyTaskFinished(info, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false); } } @@ -265,7 +265,7 @@ public class HadoopChildProcessRunner { * @param status Task status. */ private void onTaskFinished0(HadoopRunnableTask run, HadoopTaskStatus status) { - GridHadoopTaskInfo info = run.taskInfo(); + HadoopTaskInfo info = run.taskInfo(); int pendingTasks0 = pendingTasks.decrementAndGet(); @@ -286,7 +286,7 @@ public class HadoopChildProcessRunner { * @param taskInfo Finished task info. * @param status Task status. */ - private void notifyTaskFinished(final GridHadoopTaskInfo taskInfo, final HadoopTaskStatus status, + private void notifyTaskFinished(final HadoopTaskInfo taskInfo, final HadoopTaskStatus status, boolean flush) { final HadoopTaskState state = status.state(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java index 66508a8..4cba117 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java @@ -33,7 +33,7 @@ public class HadoopMarshallerFilter extends GridNioFilterAdapter { * @param marshaller Marshaller to use. */ public HadoopMarshallerFilter(Marshaller marshaller) { - super("GridHadoopMarshallerFilter"); + super("HadoopMarshallerFilter"); this.marshaller = marshaller; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java index cf550ab..fa570ea 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java @@ -35,7 +35,7 @@ public class HadoopV1CleanupTask extends HadoopV1Task { * @param taskInfo Task info. * @param abort Abort flag. */ - public HadoopV1CleanupTask(GridHadoopTaskInfo taskInfo, boolean abort) { + public HadoopV1CleanupTask(HadoopTaskInfo taskInfo, boolean abort) { super(taskInfo); this.abort = abort; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java index 3501f56..ad7b058 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java @@ -36,7 +36,7 @@ public class HadoopV1MapTask extends HadoopV1Task { * * @param taskInfo */ - public HadoopV1MapTask(GridHadoopTaskInfo taskInfo) { + public HadoopV1MapTask(HadoopTaskInfo taskInfo) { super(taskInfo); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java index 87e4620..18ee09d 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java @@ -36,7 +36,7 @@ public class HadoopV1ReduceTask extends HadoopV1Task { * @param taskInfo Task info. * @param reduce {@code True} if reduce, {@code false} if combine. */ - public HadoopV1ReduceTask(GridHadoopTaskInfo taskInfo, boolean reduce) { + public HadoopV1ReduceTask(HadoopTaskInfo taskInfo, boolean reduce) { super(taskInfo); this.reduce = reduce; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java index d1b4d79..a758f1d 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java @@ -33,7 +33,7 @@ public class HadoopV1SetupTask extends HadoopV1Task { * * @param taskInfo Task info. */ - public HadoopV1SetupTask(GridHadoopTaskInfo taskInfo) { + public HadoopV1SetupTask(HadoopTaskInfo taskInfo) { super(taskInfo); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java index 633bf1d..b7da700 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java @@ -37,7 +37,7 @@ public abstract class HadoopV1Task extends HadoopTask { * * @param taskInfo Task info. */ - protected HadoopV1Task(GridHadoopTaskInfo taskInfo) { + protected HadoopV1Task(HadoopTaskInfo taskInfo) { super(taskInfo); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java index 0a2af6d..534033b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java @@ -38,7 +38,7 @@ public class HadoopV2CleanupTask extends HadoopV2Task { * @param taskInfo Task info. * @param abort Abort flag. */ - public HadoopV2CleanupTask(GridHadoopTaskInfo taskInfo, boolean abort) { + public HadoopV2CleanupTask(HadoopTaskInfo taskInfo, boolean abort) { super(taskInfo); this.abort = abort; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java index 9ee6b6f..3f8e2b6 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java @@ -32,10 +32,10 @@ import java.util.*; * Hadoop context implementation for v2 API. It provides IO operations for hadoop tasks. */ public class HadoopV2Context extends JobContextImpl implements MapContext, ReduceContext { - /** Input reader to overriding of GridHadoopTaskContext input. */ + /** Input reader to overriding of HadoopTaskContext input. */ private RecordReader reader; - /** Output writer to overriding of GridHadoopTaskContext output. */ + /** Output writer to overriding of HadoopTaskContext output. */ private RecordWriter writer; /** Output is provided by executor environment. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java index 902af88..f2f0cab 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java @@ -52,10 +52,10 @@ public class HadoopV2Job implements HadoopJob { private final JobContextImpl jobCtx; /** Hadoop job ID. */ - private final GridHadoopJobId jobId; + private final HadoopJobId jobId; /** Job info. */ - protected GridHadoopJobInfo jobInfo; + protected HadoopJobInfo jobInfo; /** */ private final JobID hadoopJobID; @@ -81,7 +81,7 @@ public class HadoopV2Job implements HadoopJob { * @param jobInfo Job info. * @param log Logger. */ - public HadoopV2Job(GridHadoopJobId jobId, final HadoopDefaultJobInfo jobInfo, IgniteLogger log) { + public HadoopV2Job(HadoopJobId jobId, final HadoopDefaultJobInfo jobInfo, IgniteLogger log) { assert jobId != null; assert jobInfo != null; @@ -110,12 +110,12 @@ public class HadoopV2Job implements HadoopJob { } /** {@inheritDoc} */ - @Override public GridHadoopJobId id() { + @Override public HadoopJobId id() { return jobId; } /** {@inheritDoc} */ - @Override public GridHadoopJobInfo info() { + @Override public HadoopJobInfo info() { return jobInfo; } @@ -178,7 +178,7 @@ public class HadoopV2Job implements HadoopJob { } /** {@inheritDoc} */ - @Override public HadoopTaskContext getTaskContext(GridHadoopTaskInfo info) throws IgniteCheckedException { + @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException { T2<HadoopTaskType, Integer> locTaskId = new T2<>(info.type(), info.taskNumber()); GridFutureAdapter<HadoopTaskContext> fut = ctxs.get(locTaskId); @@ -201,8 +201,8 @@ public class HadoopV2Job implements HadoopJob { cls = ldr.loadClass(HadoopV2TaskContext.class.getName()); } - Constructor<?> ctr = cls.getConstructor(GridHadoopTaskInfo.class, HadoopJob.class, - GridHadoopJobId.class, UUID.class, DataInput.class); + Constructor<?> ctr = cls.getConstructor(HadoopTaskInfo.class, HadoopJob.class, + HadoopJobId.class, UUID.class, DataInput.class); if (jobConfData == null) synchronized(jobConf) { @@ -256,12 +256,12 @@ public class HadoopV2Job implements HadoopJob { } /** {@inheritDoc} */ - @Override public void prepareTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException { + @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { rsrcMgr.prepareTaskWorkDir(taskLocalDir(locNodeId, info)); } /** {@inheritDoc} */ - @Override public void cleanupTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException { + @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { HadoopTaskContext ctx = ctxs.remove(new T2<>(info.type(), info.taskNumber())).get(); taskCtxClsPool.offer(ctx.getClass()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java index 04481bb..6f6bfa1 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java @@ -47,7 +47,7 @@ public class HadoopV2JobResourceManager { private final IgniteLogger log; /** Job ID. */ - private final GridHadoopJobId jobId; + private final HadoopJobId jobId; /** Class path list. */ private URL[] clsPath; @@ -64,7 +64,7 @@ public class HadoopV2JobResourceManager { * @param ctx Hadoop job context. * @param log Logger. */ - public HadoopV2JobResourceManager(GridHadoopJobId jobId, JobContextImpl ctx, IgniteLogger log) { + public HadoopV2JobResourceManager(HadoopJobId jobId, JobContextImpl ctx, IgniteLogger log) { this.jobId = jobId; this.ctx = ctx; this.log = log.getLogger(HadoopV2JobResourceManager.class); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java index 43cbf5d..2bf4292 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java @@ -38,7 +38,7 @@ public class HadoopV2MapTask extends HadoopV2Task { /** * @param taskInfo Task info. */ - public HadoopV2MapTask(GridHadoopTaskInfo taskInfo) { + public HadoopV2MapTask(HadoopTaskInfo taskInfo) { super(taskInfo); } @@ -74,7 +74,7 @@ public class HadoopV2MapTask extends HadoopV2Task { hadoopContext().reader(reader); - GridHadoopJobInfo jobInfo = taskCtx.job().info(); + HadoopJobInfo jobInfo = taskCtx.job().info(); outputFormat = jobInfo.hasCombiner() || jobInfo.hasReducer() ? null : prepareWriter(jobCtx); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2ReduceTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2ReduceTask.java index 66ff542..250c41b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2ReduceTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2ReduceTask.java @@ -39,7 +39,7 @@ public class HadoopV2ReduceTask extends HadoopV2Task { * @param taskInfo Task info. * @param reduce {@code True} if reduce, {@code false} if combine. */ - public HadoopV2ReduceTask(GridHadoopTaskInfo taskInfo, boolean reduce) { + public HadoopV2ReduceTask(HadoopTaskInfo taskInfo, boolean reduce) { super(taskInfo); this.reduce = reduce; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2SetupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2SetupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2SetupTask.java index d0ac792..81587c1 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2SetupTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2SetupTask.java @@ -35,7 +35,7 @@ public class HadoopV2SetupTask extends HadoopV2Task { * * @param taskInfo task info. */ - public HadoopV2SetupTask(GridHadoopTaskInfo taskInfo) { + public HadoopV2SetupTask(HadoopTaskInfo taskInfo) { super(taskInfo); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java index 04d84a8..5ade3fb 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java @@ -37,7 +37,7 @@ public abstract class HadoopV2Task extends HadoopTask { * * @param taskInfo Task info. */ - protected HadoopV2Task(GridHadoopTaskInfo taskInfo) { + protected HadoopV2Task(HadoopTaskInfo taskInfo) { super(taskInfo); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java index f0d41ae..24f10a6 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java @@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.*; import org.apache.ignite.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import org.apache.ignite.internal.processors.hadoop.fs.*; import org.apache.ignite.internal.processors.hadoop.v1.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -89,7 +90,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { private UUID locNodeId; /** Counters for task. */ - private final GridHadoopCounters cntrs = new HadoopCountersImpl(); + private final HadoopCounters cntrs = new HadoopCountersImpl(); /** * @param taskInfo Task info. @@ -98,7 +99,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { * @param locNodeId Local node ID. * @param jobConfDataInput DataInput for read JobConf. */ - public HadoopV2TaskContext(GridHadoopTaskInfo taskInfo, HadoopJob job, GridHadoopJobId jobId, + public HadoopV2TaskContext(HadoopTaskInfo taskInfo, HadoopJob job, HadoopJobId jobId, @Nullable UUID locNodeId, DataInput jobConfDataInput) throws IgniteCheckedException { super(taskInfo, job); this.locNodeId = locNodeId; @@ -136,7 +137,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { } /** {@inheritDoc} */ - @Override public GridHadoopCounters counters() { + @Override public HadoopCounters counters() { return cntrs; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java index 667fec9..ffa20d1 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java @@ -17,15 +17,15 @@ package org.apache.ignite.client.hadoop; -import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.configuration.*; /** * Hadoop client protocol tests in embedded process mode. */ public class HadoopClientProtocolEmbeddedSelfTest extends HadoopClientProtocolSelfTest { /** {@inheritDoc} */ - @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); cfg.setExternalExecution(false); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPopularWordsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPopularWordsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPopularWordsTest.java deleted file mode 100644 index 3e8a95a..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPopularWordsTest.java +++ /dev/null @@ -1,294 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import com.google.common.collect.*; -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.*; -import org.apache.hadoop.mapreduce.lib.output.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.*; -import java.util.Map.*; - -import static com.google.common.collect.Maps.*; -import static com.google.common.collect.MinMaxPriorityQueue.*; -import static java.util.Collections.*; - -/** - * Hadoop-based 10 popular words example: all files in a given directory are tokenized and for each word longer than - * 3 characters the number of occurrences ins calculated. Finally, 10 words with the highest occurrence count are - * output. - * - * NOTE: in order to run this example on Windows please ensure that cygwin is installed and available in the system - * path. - */ -public class GridHadoopPopularWordsTest { - /** Ignite home. */ - private static final String IGNITE_HOME = U.getIgniteHome(); - - /** The path to the input directory. ALl files in that directory will be processed. */ - private static final Path BOOKS_LOCAL_DIR = - new Path("file:" + IGNITE_HOME, "modules/tests/java/org/apache/ignite/grid/hadoop/books"); - - /** The path to the output directory. THe result file will be written to this location. */ - private static final Path RESULT_LOCAL_DIR = - new Path("file:" + IGNITE_HOME, "modules/tests/java/org/apache/ignite/grid/hadoop/output"); - - /** Popular books source dir in DFS. */ - private static final Path BOOKS_DFS_DIR = new Path("tmp/word-count-example/in"); - - /** Popular books source dir in DFS. */ - private static final Path RESULT_DFS_DIR = new Path("tmp/word-count-example/out"); - - /** Path to the distributed file system configuration. */ - private static final String DFS_CFG = "examples/config/filesystem/core-site.xml"; - - /** Top N words to select **/ - private static final int POPULAR_WORDS_CNT = 10; - - /** - * For each token in the input string the mapper emits a {word, 1} pair. - */ - private static class TokenizingMapper extends Mapper<LongWritable, Text, Text, IntWritable> { - /** Constant value. */ - private static final IntWritable ONE = new IntWritable(1); - - /** The word converted into the Text. */ - private Text word = new Text(); - - /** - * Emits a entry where the key is the word and the value is always 1. - * - * @param key the current position in the input file (not used here) - * @param val the text string - * @param ctx mapper context - * @throws IOException - * @throws InterruptedException - */ - @Override protected void map(LongWritable key, Text val, Context ctx) - throws IOException, InterruptedException { - // Get the mapped object. - final String line = val.toString(); - - // Splits the given string to words. - final String[] words = line.split("[^a-zA-Z0-9]"); - - for (final String w : words) { - // Only emit counts for longer words. - if (w.length() <= 3) - continue; - - word.set(w); - - // Write the word into the context with the initial count equals 1. - ctx.write(word, ONE); - } - } - } - - /** - * The reducer uses a priority queue to rank the words based on its number of occurrences. - */ - private static class TopNWordsReducer extends Reducer<Text, IntWritable, Text, IntWritable> { - private MinMaxPriorityQueue<Entry<Integer, String>> q; - - TopNWordsReducer() { - q = orderedBy(reverseOrder(new Comparator<Entry<Integer, String>>() { - @Override public int compare(Entry<Integer, String> o1, Entry<Integer, String> o2) { - return o1.getKey().compareTo(o2.getKey()); - } - })).expectedSize(POPULAR_WORDS_CNT).maximumSize(POPULAR_WORDS_CNT).create(); - } - - /** - * This method doesn't emit anything, but just keeps track of the top N words. - * - * @param key The word. - * @param vals The words counts. - * @param ctx Reducer context. - * @throws IOException If failed. - * @throws InterruptedException If failed. - */ - @Override public void reduce(Text key, Iterable<IntWritable> vals, Context ctx) throws IOException, - InterruptedException { - int sum = 0; - - for (IntWritable val : vals) - sum += val.get(); - - q.add(immutableEntry(sum, key.toString())); - } - - /** - * This method is called after all the word entries have been processed. It writes the accumulated - * statistics to the job output file. - * - * @param ctx The job context. - * @throws IOException If failed. - * @throws InterruptedException If failed. - */ - @Override protected void cleanup(Context ctx) throws IOException, InterruptedException { - IntWritable i = new IntWritable(); - - Text txt = new Text(); - - // iterate in desc order - while (!q.isEmpty()) { - Entry<Integer, String> e = q.removeFirst(); - - i.set(e.getKey()); - - txt.set(e.getValue()); - - ctx.write(txt, i); - } - } - } - - /** - * Configures the Hadoop MapReduce job. - * - * @return Instance of the Hadoop MapRed job. - * @throws IOException If failed. - */ - private Job createConfigBasedHadoopJob() throws IOException { - Job jobCfg = new Job(); - - Configuration cfg = jobCfg.getConfiguration(); - - // Use explicit configuration of distributed file system, if provided. - if (DFS_CFG != null) - cfg.addResource(U.resolveIgniteUrl(DFS_CFG)); - - jobCfg.setJobName("HadoopPopularWordExample"); - jobCfg.setJarByClass(GridHadoopPopularWordsTest.class); - jobCfg.setInputFormatClass(TextInputFormat.class); - jobCfg.setOutputKeyClass(Text.class); - jobCfg.setOutputValueClass(IntWritable.class); - jobCfg.setMapperClass(TokenizingMapper.class); - jobCfg.setReducerClass(TopNWordsReducer.class); - - FileInputFormat.setInputPaths(jobCfg, BOOKS_DFS_DIR); - FileOutputFormat.setOutputPath(jobCfg, RESULT_DFS_DIR); - - // Local job tracker allows the only task per wave, but text input format - // replaces it with the calculated value based on input split size option. - if ("local".equals(cfg.get("mapred.job.tracker", "local"))) { - // Split job into tasks using 32MB split size. - FileInputFormat.setMinInputSplitSize(jobCfg, 32 * 1024 * 1024); - FileInputFormat.setMaxInputSplitSize(jobCfg, Long.MAX_VALUE); - } - - return jobCfg; - } - - /** - * Runs the Hadoop job. - * - * @return {@code True} if succeeded, {@code false} otherwise. - * @throws Exception If failed. - */ - private boolean runWordCountConfigBasedHadoopJob() throws Exception { - Job job = createConfigBasedHadoopJob(); - - // Distributed file system this job will work with. - FileSystem fs = FileSystem.get(job.getConfiguration()); - - X.println(">>> Using distributed file system: " + fs.getHomeDirectory()); - - // Prepare input and output job directories. - prepareDirectories(fs); - - long time = System.currentTimeMillis(); - - // Run job. - boolean res = job.waitForCompletion(true); - - X.println(">>> Job execution time: " + (System.currentTimeMillis() - time) / 1000 + " sec."); - - // Move job results into local file system, so you can view calculated results. - publishResults(fs); - - return res; - } - - /** - * Prepare job's data: cleanup result directories that might have left over - * after previous runs, copy input files from the local file system into DFS. - * - * @param fs Distributed file system to use in job. - * @throws IOException If failed. - */ - private void prepareDirectories(FileSystem fs) throws IOException { - X.println(">>> Cleaning up DFS result directory: " + RESULT_DFS_DIR); - - fs.delete(RESULT_DFS_DIR, true); - - X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR); - - fs.delete(BOOKS_DFS_DIR, true); - - X.println(">>> Copy local files into DFS input directory: " + BOOKS_DFS_DIR); - - fs.copyFromLocalFile(BOOKS_LOCAL_DIR, BOOKS_DFS_DIR); - } - - /** - * Publish job execution results into local file system, so you can view them. - * - * @param fs Distributed file sytem used in job. - * @throws IOException If failed. - */ - private void publishResults(FileSystem fs) throws IOException { - X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR); - - fs.delete(BOOKS_DFS_DIR, true); - - X.println(">>> Cleaning up LOCAL result directory: " + RESULT_LOCAL_DIR); - - fs.delete(RESULT_LOCAL_DIR, true); - - X.println(">>> Moving job results into LOCAL result directory: " + RESULT_LOCAL_DIR); - - fs.copyToLocalFile(true, RESULT_DFS_DIR, RESULT_LOCAL_DIR); - } - - /** - * Executes a modified version of the Hadoop word count example. Here, in addition to counting the number of - * occurrences of the word in the source files, the N most popular words are selected. - * - * @param args None. - */ - public static void main(String[] args) { - try { - new GridHadoopPopularWordsTest().runWordCountConfigBasedHadoopJob(); - } - catch (Exception e) { - X.println(">>> Failed to run word count example: " + e.getMessage()); - } - - System.exit(0); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSharedMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSharedMap.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSharedMap.java deleted file mode 100644 index 689fb58..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSharedMap.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.jdk8.backport.*; - -import java.util.concurrent.*; - -/** - * For tests. - */ -public class GridHadoopSharedMap { - /** */ - private static final ConcurrentMap<String, GridHadoopSharedMap> maps = new ConcurrentHashMap8<>(); - - /** */ - private final ConcurrentMap<String, Object> map = new ConcurrentHashMap8<>(); - - /** - * Private. - */ - private GridHadoopSharedMap() { - // No-op. - } - - /** - * Puts object by key. - * - * @param key Key. - * @param val Value. - */ - public <T> T put(String key, T val) { - Object old = map.putIfAbsent(key, val); - - return old == null ? val : (T)old; - } - - /** - * @param cls Class. - * @return Map of static fields. - */ - public static GridHadoopSharedMap map(Class<?> cls) { - GridHadoopSharedMap m = maps.get(cls.getName()); - - if (m != null) - return m; - - GridHadoopSharedMap old = maps.putIfAbsent(cls.getName(), m = new GridHadoopSharedMap()); - - return old == null ? m : old; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopStartup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopStartup.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopStartup.java deleted file mode 100644 index bdc884b..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopStartup.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.conf.*; -import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem; -import org.apache.ignite.internal.util.typedef.*; - -/** - * Hadoop node startup. - */ -public class GridHadoopStartup { - /** - * @param args Arguments. - */ - public static void main(String[] args) { - G.start("config/hadoop/default-config.xml"); - } - - /** - * @return Configuration for job run. - */ - @SuppressWarnings("UnnecessaryFullyQualifiedName") - public static Configuration configuration() { - Configuration cfg = new Configuration(); - - cfg.set("fs.defaultFS", "igfs://igfs@localhost"); - - cfg.set("fs.igfs.impl", org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.class.getName()); - cfg.set("fs.AbstractFileSystem.igfs.impl", IgniteHadoopFileSystem.class.getName()); - - cfg.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER"); - - cfg.set("mapreduce.framework.name", "ignite"); - cfg.set("mapreduce.jobtracker.address", "localhost:11211"); - - return cfg; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestRoundRobinMrPlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestRoundRobinMrPlanner.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestRoundRobinMrPlanner.java deleted file mode 100644 index a75605b..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestRoundRobinMrPlanner.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.processors.hadoop.planner.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Round-robin mr planner. - */ -public class GridHadoopTestRoundRobinMrPlanner implements GridHadoopMapReducePlanner { - /** {@inheritDoc} */ - @Override public GridHadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top, - @Nullable GridHadoopMapReducePlan oldPlan) throws IgniteCheckedException { - if (top.isEmpty()) - throw new IllegalArgumentException("Topology is empty"); - - // Has at least one element. - Iterator<ClusterNode> it = top.iterator(); - - Map<UUID, Collection<HadoopInputSplit>> mappers = new HashMap<>(); - - for (HadoopInputSplit block : job.input()) { - ClusterNode node = it.next(); - - Collection<HadoopInputSplit> nodeBlocks = mappers.get(node.id()); - - if (nodeBlocks == null) { - nodeBlocks = new ArrayList<>(); - - mappers.put(node.id(), nodeBlocks); - } - - nodeBlocks.add(block); - - if (!it.hasNext()) - it = top.iterator(); - } - - int[] rdc = new int[job.info().reducers()]; - - for (int i = 0; i < rdc.length; i++) - rdc[i] = i; - - return new HadoopDefaultMapReducePlan(mappers, Collections.singletonMap(it.next().id(), rdc)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestUtils.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestUtils.java deleted file mode 100644 index cdbb809..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestUtils.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.internal.util.typedef.*; - -import java.io.*; -import java.util.*; - -import static org.junit.Assert.*; - -/** - * Utility class for tests. - */ -public class GridHadoopTestUtils { - /** - * Checks that job statistics file contains valid strings only. - * - * @param reader Buffered reader to get lines of job statistics. - * @return Amount of events. - * @throws IOException If failed. - */ - public static long simpleCheckJobStatFile(BufferedReader reader) throws IOException { - Collection<String> phases = new HashSet<>(); - - phases.add("submit"); - phases.add("prepare"); - phases.add("start"); - phases.add("finish"); - phases.add("requestId"); - phases.add("responseId"); - - Collection<String> evtTypes = new HashSet<>(); - - evtTypes.add("JOB"); - evtTypes.add("SETUP"); - evtTypes.add("MAP"); - evtTypes.add("SHUFFLE"); - evtTypes.add("REDUCE"); - evtTypes.add("COMBINE"); - evtTypes.add("COMMIT"); - - long evtCnt = 0; - String line; - - Map<Long, String> reduceNodes = new HashMap<>(); - - while((line = reader.readLine()) != null) { - String[] splitLine = line.split(":"); - - //Try parse timestamp - Long.parseLong(splitLine[1]); - - String[] evt = splitLine[0].split(" "); - - assertTrue("Unknown event '" + evt[0] + "'", evtTypes.contains(evt[0])); - - String phase; - - if ("JOB".equals(evt[0])) - phase = evt[1]; - else { - assertEquals(4, evt.length); - assertTrue("The node id is not defined", !F.isEmpty(evt[3])); - - long taskNum = Long.parseLong(evt[1]); - - if (("REDUCE".equals(evt[0]) || "SHUFFLE".equals(evt[0]))) { - String nodeId = reduceNodes.get(taskNum); - - if (nodeId == null) - reduceNodes.put(taskNum, evt[3]); - else - assertEquals("Different nodes for SHUFFLE and REDUCE tasks", nodeId, evt[3]); - } - - phase = evt[2]; - } - - assertTrue("Unknown phase '" + phase + "' in " + Arrays.toString(evt), phases.contains(phase)); - - evtCnt++; - } - - return evtCnt; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java index 70bf0f2..a26ead5 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java @@ -128,8 +128,8 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest { * @param gridName Grid name. * @return Hadoop configuration. */ - public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = new GridHadoopConfiguration(); + public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = new HadoopConfiguration(); cfg.setMaxParallelTasks(3); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java index 733ed01..33fa358 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java @@ -327,7 +327,7 @@ public class HadoopCommandLineTest extends GridCommonAbstractTest { assertEquals(0, executeHadoopCmd("fs", "-get", jobStatPath.toString() + "/performance", locStatFile.toString())); - long evtCnt = GridHadoopTestUtils.simpleCheckJobStatFile(new BufferedReader(new FileReader(locStatFile))); + long evtCnt = HadoopTestUtils.simpleCheckJobStatFile(new BufferedReader(new FileReader(locStatFile))); assertTrue(evtCnt >= 22); //It's the minimum amount of events for job with combiner. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java index e072592..77e20fe 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java @@ -74,7 +74,7 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes private static final IgniteFs IGFS = new MockIgfs(); /** Planner. */ - private static final GridHadoopMapReducePlanner PLANNER = new HadoopDefaultMapReducePlanner(); + private static final HadoopMapReducePlanner PLANNER = new HadoopDefaultMapReducePlanner(); /** Block locations. */ private static final Map<Block, Collection<IgfsBlockLocation>> BLOCK_MAP = new HashMap<>(); @@ -83,7 +83,7 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes private static final Map<URI, Boolean> PROXY_MAP = new HashMap<>(); /** Last created plan. */ - private static final ThreadLocal<GridHadoopMapReducePlan> PLAN = new ThreadLocal<>(); + private static final ThreadLocal<HadoopMapReducePlan> PLAN = new ThreadLocal<>(); /** * @@ -400,7 +400,7 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes * @return Plan. * @throws IgniteCheckedException If failed. */ - private static GridHadoopMapReducePlan plan(int reducers, HadoopInputSplit... splits) throws IgniteCheckedException { + private static HadoopMapReducePlan plan(int reducers, HadoopInputSplit... splits) throws IgniteCheckedException { assert reducers > 0; assert splits != null && splits.length > 0; @@ -422,7 +422,7 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes top.add(node2); top.add(node3); - GridHadoopMapReducePlan plan = PLANNER.preparePlan(new MockJob(reducers, splitList), top, null); + HadoopMapReducePlan plan = PLANNER.preparePlan(new MockJob(reducers, splitList), top, null); PLAN.set(plan); @@ -605,12 +605,12 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes } /** {@inheritDoc} */ - @Override public GridHadoopJobId id() { + @Override public HadoopJobId id() { return null; } /** {@inheritDoc} */ - @Override public GridHadoopJobInfo info() { + @Override public HadoopJobInfo info() { return new HadoopDefaultJobInfo() { @Override public int reducers() { return reducers; @@ -624,7 +624,7 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes } /** {@inheritDoc} */ - @Override public HadoopTaskContext getTaskContext(GridHadoopTaskInfo info) throws IgniteCheckedException { + @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException { return null; } @@ -639,12 +639,12 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes } /** {@inheritDoc} */ - @Override public void prepareTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException { + @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { // No-op. } /** {@inheritDoc} */ - @Override public void cleanupTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException { + @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { // No-op. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java index a6c29e9..e385ca7 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -36,7 +37,7 @@ public class HadoopGroupingTest extends HadoopAbstractSelfTest { private static final String PATH_OUTPUT = "/test-out"; /** */ - private static final GridConcurrentHashSet<UUID> vals = GridHadoopSharedMap.map(HadoopGroupingTest.class) + private static final GridConcurrentHashSet<UUID> vals = HadoopSharedMap.map(HadoopGroupingTest.class) .put("vals", new GridConcurrentHashSet<UUID>()); /** {@inheritDoc} */ @@ -60,8 +61,8 @@ public class HadoopGroupingTest extends HadoopAbstractSelfTest { } /** {@inheritDoc} */ - @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); cfg.setExternalExecution(false); @@ -110,7 +111,7 @@ public class HadoopGroupingTest extends HadoopAbstractSelfTest { job.setGroupingComparatorClass(YearComparator.class); } - grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 2), + grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2), createJobInfo(job.getConfiguration())).get(30000); assertTrue(vals.isEmpty()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java index ed6d0a0..943d89f 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -43,7 +44,7 @@ public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest { private static final int BLOCK_CNT = 10; /** */ - private static GridHadoopSharedMap m = GridHadoopSharedMap.map(HadoopJobTrackerSelfTest.class); + private static HadoopSharedMap m = HadoopSharedMap.map(HadoopJobTrackerSelfTest.class); /** Map task execution count. */ private static final AtomicInteger mapExecCnt = m.put("mapExecCnt", new AtomicInteger()); @@ -91,10 +92,10 @@ public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest { } /** {@inheritDoc} */ - @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); - cfg.setMapReducePlanner(new GridHadoopTestRoundRobinMrPlanner()); + cfg.setMapReducePlanner(new HadoopTestRoundRobinMrPlanner()); cfg.setExternalExecution(false); return cfg; @@ -116,7 +117,7 @@ public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest { FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "1")); - GridHadoopJobId jobId = new GridHadoopJobId(globalId, 1); + HadoopJobId jobId = new HadoopJobId(globalId, 1); grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); @@ -163,7 +164,7 @@ public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest { FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "2")); - GridHadoopJobId jobId = new GridHadoopJobId(globalId, 1); + HadoopJobId jobId = new HadoopJobId(globalId, 1); grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); @@ -213,13 +214,13 @@ public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest { * @param complete Completion status. * @throws Exception If failed. */ - private void checkStatus(GridHadoopJobId jobId, boolean complete) throws Exception { + private void checkStatus(HadoopJobId jobId, boolean complete) throws Exception { for (int i = 0; i < gridCount(); i++) { IgniteKernal kernal = (IgniteKernal)grid(i); Hadoop hadoop = kernal.hadoop(); - GridHadoopJobStatus stat = hadoop.status(jobId); + HadoopJobStatus stat = hadoop.status(jobId); assert stat != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java index f86c608..4a6e1ef 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java @@ -25,6 +25,7 @@ import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.ignite.configuration.*; import org.apache.ignite.igfs.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.hadoop.examples.*; @@ -38,12 +39,12 @@ import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; */ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest { /** */ - private static Map<String, Boolean> flags = GridHadoopSharedMap.map(HadoopMapReduceEmbeddedSelfTest.class) + private static Map<String, Boolean> flags = HadoopSharedMap.map(HadoopMapReduceEmbeddedSelfTest.class) .put("flags", new HashMap<String, Boolean>()); /** {@inheritDoc} */ - @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); cfg.setExternalExecution(false); @@ -60,7 +61,7 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest { igfs.mkdirs(inDir); - IgfsPath inFile = new IgfsPath(inDir, GridHadoopWordCount2.class.getSimpleName() + "-input"); + IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input"); generateTestFile(inFile.toString(), "key1", 10000, "key2", 20000, "key3", 15000, "key4", 7000, "key5", 12000, "key6", 18000 ); @@ -88,7 +89,7 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest { // File system coordinates. setupFileSystems(jobConf); - GridHadoopWordCount1.setTasksClasses(jobConf, !useNewAPI, !useNewAPI, !useNewAPI); + HadoopWordCount1.setTasksClasses(jobConf, !useNewAPI, !useNewAPI, !useNewAPI); if (!useNewAPI) { jobConf.setPartitionerClass(CustomV1Partitioner.class); @@ -98,7 +99,7 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest { Job job = Job.getInstance(jobConf); - GridHadoopWordCount2.setTasksClasses(job, useNewAPI, useNewAPI, useNewAPI); + HadoopWordCount2.setTasksClasses(job, useNewAPI, useNewAPI, useNewAPI); if (useNewAPI) { job.setPartitionerClass(CustomV2Partitioner.class); @@ -114,9 +115,9 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest { job.setNumReduceTasks(3); - job.setJarByClass(GridHadoopWordCount2.class); + job.setJarByClass(HadoopWordCount2.class); - IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 1), + IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), createJobInfo(job.getConfiguration())); fut.get();