http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java index 2d64277..3f574e9 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.hadoop.jobtracker; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -26,7 +27,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; import java.io.*; import java.util.*; -import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobPhase.*; +import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.*; /** * Hadoop job metadata. Internal object used for distributed job state tracking. @@ -36,16 +37,16 @@ public class HadoopJobMetadata implements Externalizable { private static final long serialVersionUID = 0L; /** Job ID. */ - private GridHadoopJobId jobId; + private HadoopJobId jobId; /** Job info. */ - private GridHadoopJobInfo jobInfo; + private HadoopJobInfo jobInfo; /** Node submitted job. */ private UUID submitNodeId; /** Map-reduce plan. */ - private GridHadoopMapReducePlan mrPlan; + private HadoopMapReducePlan mrPlan; /** Pending splits for which mapper should be executed. */ private Map<HadoopInputSplit, Integer> pendingSplits; @@ -58,7 +59,7 @@ public class HadoopJobMetadata implements Externalizable { private Map<Integer, HadoopProcessDescriptor> reducersAddrs; /** Job phase. */ - private GridHadoopJobPhase phase = PHASE_SETUP; + private HadoopJobPhase phase = PHASE_SETUP; /** Fail cause. */ @GridToStringExclude @@ -68,7 +69,7 @@ public class HadoopJobMetadata implements Externalizable { private long ver; /** Job counters */ - private GridHadoopCounters counters = new HadoopCountersImpl(); + private HadoopCounters counters = new HadoopCountersImpl(); /** * Empty constructor required by {@link Externalizable}. @@ -84,7 +85,7 @@ public class HadoopJobMetadata implements Externalizable { * @param jobId Job ID. * @param jobInfo Job info. */ - public HadoopJobMetadata(UUID submitNodeId, GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) { + public HadoopJobMetadata(UUID submitNodeId, HadoopJobId jobId, HadoopJobInfo jobInfo) { this.jobId = jobId; this.jobInfo = jobInfo; this.submitNodeId = submitNodeId; @@ -120,14 +121,14 @@ public class HadoopJobMetadata implements Externalizable { /** * @param phase Job phase. */ - public void phase(GridHadoopJobPhase phase) { + public void phase(HadoopJobPhase phase) { this.phase = phase; } /** * @return Job phase. */ - public GridHadoopJobPhase phase() { + public HadoopJobPhase phase() { return phase; } @@ -188,14 +189,14 @@ public class HadoopJobMetadata implements Externalizable { /** * @return Job ID. */ - public GridHadoopJobId jobId() { + public HadoopJobId jobId() { return jobId; } /** * @param mrPlan Map-reduce plan. */ - public void mapReducePlan(GridHadoopMapReducePlan mrPlan) { + public void mapReducePlan(HadoopMapReducePlan mrPlan) { assert this.mrPlan == null : "Map-reduce plan can only be initialized once."; this.mrPlan = mrPlan; @@ -204,14 +205,14 @@ public class HadoopJobMetadata implements Externalizable { /** * @return Map-reduce plan. */ - public GridHadoopMapReducePlan mapReducePlan() { + public HadoopMapReducePlan mapReducePlan() { return mrPlan; } /** * @return Job info. */ - public GridHadoopJobInfo jobInfo() { + public HadoopJobInfo jobInfo() { return jobInfo; } @@ -220,7 +221,7 @@ public class HadoopJobMetadata implements Externalizable { * * @return Collection of counters. */ - public GridHadoopCounters counters() { + public HadoopCounters counters() { return counters; } @@ -229,7 +230,7 @@ public class HadoopJobMetadata implements Externalizable { * * @param counters Collection of counters. */ - public void counters(GridHadoopCounters counters) { + public void counters(HadoopCounters counters) { this.counters = counters; } @@ -284,16 +285,16 @@ public class HadoopJobMetadata implements Externalizable { @SuppressWarnings("unchecked") @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { submitNodeId = U.readUuid(in); - jobId = (GridHadoopJobId)in.readObject(); - jobInfo = (GridHadoopJobInfo)in.readObject(); - mrPlan = (GridHadoopMapReducePlan)in.readObject(); + jobId = (HadoopJobId)in.readObject(); + jobInfo = (HadoopJobInfo)in.readObject(); + mrPlan = (HadoopMapReducePlan)in.readObject(); pendingSplits = (Map<HadoopInputSplit,Integer>)in.readObject(); pendingReducers = (Collection<Integer>)in.readObject(); - phase = (GridHadoopJobPhase)in.readObject(); + phase = (HadoopJobPhase)in.readObject(); failCause = (Throwable)in.readObject(); ver = in.readLong(); reducersAddrs = (Map<Integer, HadoopProcessDescriptor>)in.readObject(); - counters = (GridHadoopCounters)in.readObject(); + counters = (HadoopCounters)in.readObject(); } /** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java index 99a759d..39f42b2 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import org.apache.ignite.internal.processors.hadoop.taskexecutor.*; import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; import org.apache.ignite.internal.util.*; @@ -44,7 +45,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; import static java.util.concurrent.TimeUnit.*; -import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobPhase.*; +import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.*; import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.*; import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.*; @@ -56,23 +57,23 @@ public class HadoopJobTracker extends HadoopComponent { private final GridMutex mux = new GridMutex(); /** */ - private volatile GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> jobMetaPrj; + private volatile GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> jobMetaPrj; /** Projection with expiry policy for finished job updates. */ - private volatile GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> finishedJobMetaPrj; + private volatile GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> finishedJobMetaPrj; /** Map-reduce execution planner. */ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private GridHadoopMapReducePlanner mrPlanner; + private HadoopMapReducePlanner mrPlanner; /** All the known jobs. */ - private final ConcurrentMap<GridHadoopJobId, GridFutureAdapterEx<HadoopJob>> jobs = new ConcurrentHashMap8<>(); + private final ConcurrentMap<HadoopJobId, GridFutureAdapterEx<HadoopJob>> jobs = new ConcurrentHashMap8<>(); /** Locally active jobs. */ - private final ConcurrentMap<GridHadoopJobId, JobLocalState> activeJobs = new ConcurrentHashMap8<>(); + private final ConcurrentMap<HadoopJobId, JobLocalState> activeJobs = new ConcurrentHashMap8<>(); /** Locally requested finish futures. */ - private final ConcurrentMap<GridHadoopJobId, GridFutureAdapter<GridHadoopJobId>> activeFinishFuts = + private final ConcurrentMap<HadoopJobId, GridFutureAdapter<HadoopJobId>> activeFinishFuts = new ConcurrentHashMap8<>(); /** Event processing service. */ @@ -106,8 +107,8 @@ public class HadoopJobTracker extends HadoopComponent { * @return Job meta projection. */ @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") - private GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> jobMetaCache() { - GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> prj = jobMetaPrj; + private GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> jobMetaCache() { + GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> prj = jobMetaPrj; if (prj == null) { synchronized (mux) { @@ -128,8 +129,8 @@ public class HadoopJobTracker extends HadoopComponent { throw new IllegalStateException(e); } - jobMetaPrj = prj = (GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata>) - sysCache.projection(GridHadoopJobId.class, HadoopJobMetadata.class); + jobMetaPrj = prj = (GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata>) + sysCache.projection(HadoopJobId.class, HadoopJobMetadata.class); if (ctx.configuration().getFinishedJobInfoTtl() > 0) { ExpiryPolicy finishedJobPlc = new ModifiedExpiryPolicy( @@ -149,8 +150,8 @@ public class HadoopJobTracker extends HadoopComponent { /** * @return Projection with expiry policy for finished job updates. */ - private GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> finishedJobMetaCache() { - GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> prj = finishedJobMetaPrj; + private GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> finishedJobMetaCache() { + GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> prj = finishedJobMetaPrj; if (prj == null) { jobMetaCache(); @@ -169,8 +170,8 @@ public class HadoopJobTracker extends HadoopComponent { super.onKernalStart(); jobMetaCache().context().continuousQueries().executeInternalQuery( - new CacheEntryUpdatedListener<GridHadoopJobId, HadoopJobMetadata>() { - @Override public void onUpdated(final Iterable<CacheEntryEvent<? extends GridHadoopJobId, + new CacheEntryUpdatedListener<HadoopJobId, HadoopJobMetadata>() { + @Override public void onUpdated(final Iterable<CacheEntryEvent<? extends HadoopJobId, ? extends HadoopJobMetadata>> evts) { if (!busyLock.tryReadLock()) return; @@ -222,7 +223,7 @@ public class HadoopJobTracker extends HadoopComponent { evtProcSvc.shutdown(); // Fail all pending futures. - for (GridFutureAdapter<GridHadoopJobId> fut : activeFinishFuts.values()) + for (GridFutureAdapter<HadoopJobId> fut : activeFinishFuts.values()) fut.onDone(new IgniteCheckedException("Failed to execute Hadoop map-reduce job (grid is stopping).")); } @@ -234,7 +235,7 @@ public class HadoopJobTracker extends HadoopComponent { * @return Job completion future. */ @SuppressWarnings("unchecked") - public IgniteInternalFuture<GridHadoopJobId> submit(GridHadoopJobId jobId, GridHadoopJobInfo info) { + public IgniteInternalFuture<HadoopJobId> submit(HadoopJobId jobId, HadoopJobInfo info) { if (!busyLock.tryReadLock()) { return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to execute map-reduce job " + "(grid is stopping): " + info)); @@ -248,7 +249,7 @@ public class HadoopJobTracker extends HadoopComponent { HadoopJob job = job(jobId, info); - GridHadoopMapReducePlan mrPlan = mrPlanner.preparePlan(job, ctx.nodes(), null); + HadoopMapReducePlan mrPlan = mrPlanner.preparePlan(job, ctx.nodes(), null); HadoopJobMetadata meta = new HadoopJobMetadata(ctx.localNodeId(), jobId, info); @@ -257,9 +258,9 @@ public class HadoopJobTracker extends HadoopComponent { meta.pendingSplits(allSplits(mrPlan)); meta.pendingReducers(allReducers(mrPlan)); - GridFutureAdapter<GridHadoopJobId> completeFut = new GridFutureAdapter<>(); + GridFutureAdapter<HadoopJobId> completeFut = new GridFutureAdapter<>(); - GridFutureAdapter<GridHadoopJobId> old = activeFinishFuts.put(jobId, completeFut); + GridFutureAdapter<HadoopJobId> old = activeFinishFuts.put(jobId, completeFut); assert old == null : "Duplicate completion future [jobId=" + jobId + ", old=" + old + ']'; @@ -297,10 +298,10 @@ public class HadoopJobTracker extends HadoopComponent { * @return Status. */ @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - public static GridHadoopJobStatus status(HadoopJobMetadata meta) { - GridHadoopJobInfo jobInfo = meta.jobInfo(); + public static HadoopJobStatus status(HadoopJobMetadata meta) { + HadoopJobInfo jobInfo = meta.jobInfo(); - return new GridHadoopJobStatus( + return new HadoopJobStatus( meta.jobId(), jobInfo.jobName(), jobInfo.user(), @@ -320,7 +321,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param jobId Job ID to get status for. * @return Job status for given job ID or {@code null} if job was not found. */ - @Nullable public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException { + @Nullable public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException { if (!busyLock.tryReadLock()) return null; // Grid is stopping. @@ -341,7 +342,7 @@ public class HadoopJobTracker extends HadoopComponent { * @return Finish future or {@code null}. * @throws IgniteCheckedException If failed. */ - @Nullable public IgniteInternalFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException { + @Nullable public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException { if (!busyLock.tryReadLock()) return null; // Grid is stopping. @@ -361,8 +362,8 @@ public class HadoopJobTracker extends HadoopComponent { return new GridFinishedFutureEx<>(jobId, meta.failCause()); } - GridFutureAdapter<GridHadoopJobId> fut = F.addIfAbsent(activeFinishFuts, jobId, - new GridFutureAdapter<GridHadoopJobId>()); + GridFutureAdapter<HadoopJobId> fut = F.addIfAbsent(activeFinishFuts, jobId, + new GridFutureAdapter<HadoopJobId>()); // Get meta from cache one more time to close the window. meta = jobMetaCache().get(jobId); @@ -395,7 +396,7 @@ public class HadoopJobTracker extends HadoopComponent { * @return Job plan. * @throws IgniteCheckedException If failed. */ - public GridHadoopMapReducePlan plan(GridHadoopJobId jobId) throws IgniteCheckedException { + public HadoopMapReducePlan plan(HadoopJobId jobId) throws IgniteCheckedException { if (!busyLock.tryReadLock()) return null; @@ -419,7 +420,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param status Task status. */ @SuppressWarnings({"ConstantConditions", "ThrowableResultOfMethodCallIgnored"}) - public void onTaskFinished(GridHadoopTaskInfo info, HadoopTaskStatus status) { + public void onTaskFinished(HadoopTaskInfo info, HadoopTaskStatus status) { if (!busyLock.tryReadLock()) return; @@ -470,7 +471,7 @@ public class HadoopJobTracker extends HadoopComponent { case COMMIT: case ABORT: { - GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> cache = finishedJobMetaCache(); + GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> cache = finishedJobMetaCache(); cache.invokeAsync(info.jobId(), new UpdatePhaseProcessor(incrCntrs, PHASE_COMPLETE)). listenAsync(failsLog); @@ -488,7 +489,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param jobId Job id. * @param c Closure of operation. */ - private void transform(GridHadoopJobId jobId, EntryProcessor<GridHadoopJobId, HadoopJobMetadata, Void> c) { + private void transform(HadoopJobId jobId, EntryProcessor<HadoopJobId, HadoopJobMetadata, Void> c) { jobMetaCache().invokeAsync(jobId, c).listenAsync(failsLog); } @@ -499,7 +500,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param reducers Reducers. * @param desc Process descriptor. */ - public void onExternalMappersInitialized(GridHadoopJobId jobId, Collection<Integer> reducers, + public void onExternalMappersInitialized(HadoopJobId jobId, Collection<Integer> reducers, HadoopProcessDescriptor desc) { transform(jobId, new InitializeReducersProcessor(null, reducers, desc)); } @@ -511,7 +512,7 @@ public class HadoopJobTracker extends HadoopComponent { * @return Collection of all input splits that should be processed. */ @SuppressWarnings("ConstantConditions") - private Map<HadoopInputSplit, Integer> allSplits(GridHadoopMapReducePlan plan) { + private Map<HadoopInputSplit, Integer> allSplits(HadoopMapReducePlan plan) { Map<HadoopInputSplit, Integer> res = new HashMap<>(); int taskNum = 0; @@ -532,7 +533,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param plan Map-reduce plan. * @return Collection of reducers. */ - private Collection<Integer> allReducers(GridHadoopMapReducePlan plan) { + private Collection<Integer> allReducers(HadoopMapReducePlan plan) { Collection<Integer> res = new HashSet<>(); for (int i = 0; i < plan.reducers(); i++) @@ -559,18 +560,18 @@ public class HadoopJobTracker extends HadoopComponent { for (Object metaObj : jobMetaCache().values()) { HadoopJobMetadata meta = (HadoopJobMetadata)metaObj; - GridHadoopJobId jobId = meta.jobId(); + HadoopJobId jobId = meta.jobId(); - GridHadoopMapReducePlan plan = meta.mapReducePlan(); + HadoopMapReducePlan plan = meta.mapReducePlan(); - GridHadoopJobPhase phase = meta.phase(); + HadoopJobPhase phase = meta.phase(); try { if (checkSetup && phase == PHASE_SETUP && !activeJobs.containsKey(jobId)) { // Failover setup task. HadoopJob job = job(jobId, meta.jobInfo()); - Collection<GridHadoopTaskInfo> setupTask = setupTask(jobId); + Collection<HadoopTaskInfo> setupTask = setupTask(jobId); assert setupTask != null; @@ -626,12 +627,12 @@ public class HadoopJobTracker extends HadoopComponent { * @throws IgniteCheckedException If failed. */ private void processJobMetadataUpdates( - Iterable<CacheEntryEvent<? extends GridHadoopJobId, ? extends HadoopJobMetadata>> updated) + Iterable<CacheEntryEvent<? extends HadoopJobId, ? extends HadoopJobMetadata>> updated) throws IgniteCheckedException { UUID locNodeId = ctx.localNodeId(); - for (CacheEntryEvent<? extends GridHadoopJobId, ? extends HadoopJobMetadata> entry : updated) { - GridHadoopJobId jobId = entry.getKey(); + for (CacheEntryEvent<? extends HadoopJobId, ? extends HadoopJobMetadata> entry : updated) { + HadoopJobId jobId = entry.getKey(); HadoopJobMetadata meta = entry.getValue(); if (meta == null || !ctx.isParticipating(meta)) @@ -661,7 +662,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param jobId Job ID. * @param plan Map-reduce plan. */ - private void printPlan(GridHadoopJobId jobId, GridHadoopMapReducePlan plan) { + private void printPlan(HadoopJobId jobId, HadoopMapReducePlan plan) { log.info("Plan for " + jobId); SB b = new SB(); @@ -689,18 +690,18 @@ public class HadoopJobTracker extends HadoopComponent { * @param locNodeId Local node ID. * @throws IgniteCheckedException If failed. */ - private void processJobMetaUpdate(GridHadoopJobId jobId, HadoopJobMetadata meta, UUID locNodeId) + private void processJobMetaUpdate(HadoopJobId jobId, HadoopJobMetadata meta, UUID locNodeId) throws IgniteCheckedException { JobLocalState state = activeJobs.get(jobId); HadoopJob job = job(jobId, meta.jobInfo()); - GridHadoopMapReducePlan plan = meta.mapReducePlan(); + HadoopMapReducePlan plan = meta.mapReducePlan(); switch (meta.phase()) { case PHASE_SETUP: { if (ctx.jobUpdateLeader()) { - Collection<GridHadoopTaskInfo> setupTask = setupTask(jobId); + Collection<HadoopTaskInfo> setupTask = setupTask(jobId); if (setupTask != null) ctx.taskExecutor().run(job, setupTask); @@ -711,7 +712,7 @@ public class HadoopJobTracker extends HadoopComponent { case PHASE_MAP: { // Check if we should initiate new task on local node. - Collection<GridHadoopTaskInfo> tasks = mapperTasks(plan.mappers(locNodeId), meta); + Collection<HadoopTaskInfo> tasks = mapperTasks(plan.mappers(locNodeId), meta); if (tasks != null) ctx.taskExecutor().run(job, tasks); @@ -721,7 +722,7 @@ public class HadoopJobTracker extends HadoopComponent { case PHASE_REDUCE: { if (meta.pendingReducers().isEmpty() && ctx.jobUpdateLeader()) { - GridHadoopTaskInfo info = new GridHadoopTaskInfo(COMMIT, jobId, 0, 0, null); + HadoopTaskInfo info = new HadoopTaskInfo(COMMIT, jobId, 0, 0, null); if (log.isDebugEnabled()) log.debug("Submitting COMMIT task for execution [locNodeId=" + locNodeId + @@ -732,7 +733,7 @@ public class HadoopJobTracker extends HadoopComponent { break; } - Collection<GridHadoopTaskInfo> tasks = reducerTasks(plan.reducers(locNodeId), job); + Collection<HadoopTaskInfo> tasks = reducerTasks(plan.reducers(locNodeId), job); if (tasks != null) ctx.taskExecutor().run(job, tasks); @@ -756,7 +757,7 @@ public class HadoopJobTracker extends HadoopComponent { // Prevent running multiple abort tasks. if (state.onAborted()) { - GridHadoopTaskInfo info = new GridHadoopTaskInfo(ABORT, jobId, 0, 0, null); + HadoopTaskInfo info = new HadoopTaskInfo(ABORT, jobId, 0, 0, null); if (log.isDebugEnabled()) log.debug("Submitting ABORT task for execution [locNodeId=" + locNodeId + @@ -811,7 +812,7 @@ public class HadoopJobTracker extends HadoopComponent { ctx.shuffle().jobFinished(jobId); } - GridFutureAdapter<GridHadoopJobId> finishFut = activeFinishFuts.remove(jobId); + GridFutureAdapter<HadoopJobId> finishFut = activeFinishFuts.remove(jobId); if (finishFut != null) { if (log.isDebugEnabled()) @@ -838,7 +839,7 @@ public class HadoopJobTracker extends HadoopComponent { HadoopCounterWriter writer = (HadoopCounterWriter)cls.newInstance(); - GridHadoopCounters cntrs = meta.counters(); + HadoopCounters cntrs = meta.counters(); writer.write(job.info(), jobId, cntrs); } @@ -862,13 +863,13 @@ public class HadoopJobTracker extends HadoopComponent { * @param jobId Job ID. * @return Setup task wrapped in collection. */ - @Nullable private Collection<GridHadoopTaskInfo> setupTask(GridHadoopJobId jobId) { + @Nullable private Collection<HadoopTaskInfo> setupTask(HadoopJobId jobId) { if (activeJobs.containsKey(jobId)) return null; else { initState(jobId); - return Collections.singleton(new GridHadoopTaskInfo(SETUP, jobId, 0, 0, null)); + return Collections.singleton(new HadoopTaskInfo(SETUP, jobId, 0, 0, null)); } } @@ -879,13 +880,13 @@ public class HadoopJobTracker extends HadoopComponent { * @param meta Job metadata. * @return Collection of created task infos or {@code null} if no mapper tasks scheduled for local node. */ - private Collection<GridHadoopTaskInfo> mapperTasks(Iterable<HadoopInputSplit> mappers, HadoopJobMetadata meta) { + private Collection<HadoopTaskInfo> mapperTasks(Iterable<HadoopInputSplit> mappers, HadoopJobMetadata meta) { UUID locNodeId = ctx.localNodeId(); - GridHadoopJobId jobId = meta.jobId(); + HadoopJobId jobId = meta.jobId(); JobLocalState state = activeJobs.get(jobId); - Collection<GridHadoopTaskInfo> tasks = null; + Collection<HadoopTaskInfo> tasks = null; if (mappers != null) { if (state == null) @@ -897,7 +898,7 @@ public class HadoopJobTracker extends HadoopComponent { log.debug("Submitting MAP task for execution [locNodeId=" + locNodeId + ", split=" + split + ']'); - GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(MAP, jobId, meta.taskNumber(split), 0, split); + HadoopTaskInfo taskInfo = new HadoopTaskInfo(MAP, jobId, meta.taskNumber(split), 0, split); if (tasks == null) tasks = new ArrayList<>(); @@ -917,13 +918,13 @@ public class HadoopJobTracker extends HadoopComponent { * @param job Job instance. * @return Collection of task infos. */ - private Collection<GridHadoopTaskInfo> reducerTasks(int[] reducers, HadoopJob job) { + private Collection<HadoopTaskInfo> reducerTasks(int[] reducers, HadoopJob job) { UUID locNodeId = ctx.localNodeId(); - GridHadoopJobId jobId = job.id(); + HadoopJobId jobId = job.id(); JobLocalState state = activeJobs.get(jobId); - Collection<GridHadoopTaskInfo> tasks = null; + Collection<HadoopTaskInfo> tasks = null; if (reducers != null) { if (state == null) @@ -935,7 +936,7 @@ public class HadoopJobTracker extends HadoopComponent { log.debug("Submitting REDUCE task for execution [locNodeId=" + locNodeId + ", rdc=" + rdc + ']'); - GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(REDUCE, jobId, rdc, 0, null); + HadoopTaskInfo taskInfo = new HadoopTaskInfo(REDUCE, jobId, rdc, 0, null); if (tasks == null) tasks = new ArrayList<>(); @@ -954,7 +955,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param jobId Job ID. * @return Local state. */ - private JobLocalState initState(GridHadoopJobId jobId) { + private JobLocalState initState(HadoopJobId jobId) { return F.addIfAbsent(activeJobs, jobId, new JobLocalState()); } @@ -966,7 +967,7 @@ public class HadoopJobTracker extends HadoopComponent { * @return Job. * @throws IgniteCheckedException If failed. */ - @Nullable public HadoopJob job(GridHadoopJobId jobId, @Nullable GridHadoopJobInfo jobInfo) throws IgniteCheckedException { + @Nullable public HadoopJob job(HadoopJobId jobId, @Nullable HadoopJobInfo jobInfo) throws IgniteCheckedException { GridFutureAdapterEx<HadoopJob> fut = jobs.get(jobId); if (fut != null || (fut = jobs.putIfAbsent(jobId, new GridFutureAdapterEx<HadoopJob>())) != null) @@ -1019,7 +1020,7 @@ public class HadoopJobTracker extends HadoopComponent { * @return {@code True} if job was killed. * @throws IgniteCheckedException If failed. */ - public boolean killJob(GridHadoopJobId jobId) throws IgniteCheckedException { + public boolean killJob(HadoopJobId jobId) throws IgniteCheckedException { if (!busyLock.tryReadLock()) return false; // Grid is stopping. @@ -1058,7 +1059,7 @@ public class HadoopJobTracker extends HadoopComponent { * @return Job counters or {@code null} if job cannot be found. * @throws IgniteCheckedException If failed. */ - @Nullable public GridHadoopCounters jobCounters(GridHadoopJobId jobId) throws IgniteCheckedException { + @Nullable public HadoopCounters jobCounters(HadoopJobId jobId) throws IgniteCheckedException { if (!busyLock.tryReadLock()) return null; @@ -1158,8 +1159,8 @@ public class HadoopJobTracker extends HadoopComponent { * @param status Task status. * @param prev Previous closure. */ - private void onSetupFinished(final GridHadoopTaskInfo taskInfo, HadoopTaskStatus status, StackedProcessor prev) { - final GridHadoopJobId jobId = taskInfo.jobId(); + private void onSetupFinished(final HadoopTaskInfo taskInfo, HadoopTaskStatus status, StackedProcessor prev) { + final HadoopJobId jobId = taskInfo.jobId(); if (status.state() == FAILED || status.state() == CRASHED) transform(jobId, new CancelJobProcessor(prev, status.failCause())); @@ -1172,9 +1173,9 @@ public class HadoopJobTracker extends HadoopComponent { * @param status Task status. * @param prev Previous closure. */ - private void onMapFinished(final GridHadoopTaskInfo taskInfo, HadoopTaskStatus status, + private void onMapFinished(final HadoopTaskInfo taskInfo, HadoopTaskStatus status, final StackedProcessor prev) { - final GridHadoopJobId jobId = taskInfo.jobId(); + final HadoopJobId jobId = taskInfo.jobId(); boolean lastMapperFinished = completedMappersCnt.incrementAndGet() == currMappers.size(); @@ -1213,8 +1214,8 @@ public class HadoopJobTracker extends HadoopComponent { * @param status Task status. * @param prev Previous closure. */ - private void onReduceFinished(GridHadoopTaskInfo taskInfo, HadoopTaskStatus status, StackedProcessor prev) { - GridHadoopJobId jobId = taskInfo.jobId(); + private void onReduceFinished(HadoopTaskInfo taskInfo, HadoopTaskStatus status, StackedProcessor prev) { + HadoopJobId jobId = taskInfo.jobId(); if (status.state() == FAILED || status.state() == CRASHED) // Fail the whole job. transform(jobId, new RemoveReducerProcessor(prev, taskInfo.taskNumber(), status.failCause())); @@ -1227,9 +1228,9 @@ public class HadoopJobTracker extends HadoopComponent { * @param status Task status. * @param prev Previous closure. */ - private void onCombineFinished(GridHadoopTaskInfo taskInfo, HadoopTaskStatus status, + private void onCombineFinished(HadoopTaskInfo taskInfo, HadoopTaskStatus status, final StackedProcessor prev) { - final GridHadoopJobId jobId = taskInfo.jobId(); + final HadoopJobId jobId = taskInfo.jobId(); if (status.state() == FAILED || status.state() == CRASHED) // Fail the whole job. @@ -1289,13 +1290,13 @@ public class HadoopJobTracker extends HadoopComponent { private static final long serialVersionUID = 0L; /** Phase to update. */ - private final GridHadoopJobPhase phase; + private final HadoopJobPhase phase; /** * @param prev Previous closure. * @param phase Phase to update. */ - private UpdatePhaseProcessor(@Nullable StackedProcessor prev, GridHadoopJobPhase phase) { + private UpdatePhaseProcessor(@Nullable StackedProcessor prev, HadoopJobPhase phase) { super(prev); this.phase = phase; @@ -1545,13 +1546,13 @@ public class HadoopJobTracker extends HadoopComponent { private static final long serialVersionUID = 0L; /** */ - private final GridHadoopCounters counters; + private final HadoopCounters counters; /** * @param prev Previous closure. * @param counters Task counters to add into job counters. */ - private IncrementCountersProcessor(@Nullable StackedProcessor prev, GridHadoopCounters counters) { + private IncrementCountersProcessor(@Nullable StackedProcessor prev, HadoopCounters counters) { super(prev); assert counters != null; @@ -1561,7 +1562,7 @@ public class HadoopJobTracker extends HadoopComponent { /** {@inheritDoc} */ @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) { - GridHadoopCounters cntrs = new HadoopCountersImpl(cp.counters()); + HadoopCounters cntrs = new HadoopCountersImpl(cp.counters()); cntrs.merge(counters); @@ -1573,7 +1574,7 @@ public class HadoopJobTracker extends HadoopComponent { * Abstract stacked closure. */ private abstract static class StackedProcessor implements - EntryProcessor<GridHadoopJobId, HadoopJobMetadata, Void>, Serializable { + EntryProcessor<HadoopJobId, HadoopJobMetadata, Void>, Serializable { /** */ private static final long serialVersionUID = 0L; @@ -1588,7 +1589,7 @@ public class HadoopJobTracker extends HadoopComponent { } /** {@inheritDoc} */ - @Override public Void process(MutableEntry<GridHadoopJobId, HadoopJobMetadata> e, Object... args) { + @Override public Void process(MutableEntry<HadoopJobId, HadoopJobMetadata> e, Object... args) { HadoopJobMetadata val = apply(e.getValue()); if (val != null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java index f24e8f2..1413612 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java @@ -25,7 +25,7 @@ import java.util.*; /** * Map-reduce plan. */ -public class HadoopDefaultMapReducePlan implements GridHadoopMapReducePlan { +public class HadoopDefaultMapReducePlan implements HadoopMapReducePlan { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java index 6e6e874..b0a8ce96 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java @@ -36,7 +36,7 @@ import static org.apache.ignite.IgniteFs.*; /** * Default map-reduce planner implementation. */ -public class HadoopDefaultMapReducePlanner implements GridHadoopMapReducePlanner { +public class HadoopDefaultMapReducePlanner implements HadoopMapReducePlanner { /** Injected grid. */ @IgniteInstanceResource private Ignite ignite; @@ -47,8 +47,8 @@ public class HadoopDefaultMapReducePlanner implements GridHadoopMapReducePlanner private IgniteLogger log; /** {@inheritDoc} */ - @Override public GridHadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top, - @Nullable GridHadoopMapReducePlan oldPlan) throws IgniteCheckedException { + @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top, + @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException { // Convert collection of topology nodes to collection of topology node IDs. Collection<UUID> topIds = new HashSet<>(top.size(), 1.0f); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java index 3a766c3..b454760 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java @@ -33,6 +33,7 @@ import org.apache.hadoop.security.token.*; import org.apache.ignite.*; import org.apache.ignite.internal.client.*; import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import org.apache.ignite.internal.util.typedef.internal.*; import java.io.*; @@ -62,7 +63,7 @@ public class HadoopClientProtocol implements ClientProtocol { private long lastVer = -1; /** Last received status. */ - private GridHadoopJobStatus lastStatus; + private HadoopJobStatus lastStatus; /** * Constructor. @@ -82,7 +83,7 @@ public class HadoopClientProtocol implements ClientProtocol { try { conf.setLong(REQ_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis()); - GridHadoopJobId jobID = cli.compute().execute(HadoopProtocolNextTaskIdTask.class.getName(), null); + HadoopJobId jobID = cli.compute().execute(HadoopProtocolNextTaskIdTask.class.getName(), null); conf.setLong(RESPONSE_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis()); @@ -99,7 +100,7 @@ public class HadoopClientProtocol implements ClientProtocol { try { conf.setLong(JOB_SUBMISSION_START_TS_PROPERTY, U.currentTimeMillis()); - GridHadoopJobStatus status = cli.compute().execute(HadoopProtocolSubmitJobTask.class.getName(), + HadoopJobStatus status = cli.compute().execute(HadoopProtocolSubmitJobTask.class.getName(), new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf))); if (status == null) @@ -157,13 +158,13 @@ public class HadoopClientProtocol implements ClientProtocol { /** {@inheritDoc} */ @Override public JobStatus getJobStatus(JobID jobId) throws IOException, InterruptedException { try { - Long delay = conf.getLong(GridHadoopJobProperty.JOB_STATUS_POLL_DELAY.propertyName(), -1); + Long delay = conf.getLong(HadoopJobProperty.JOB_STATUS_POLL_DELAY.propertyName(), -1); HadoopProtocolTaskArguments args = delay >= 0 ? new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), delay) : new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()); - GridHadoopJobStatus status = cli.compute().execute(HadoopProtocolJobStatusTask.class.getName(), args); + HadoopJobStatus status = cli.compute().execute(HadoopProtocolJobStatusTask.class.getName(), args); if (status == null) throw new IOException("Job tracker doesn't have any information about the job: " + jobId); @@ -178,13 +179,13 @@ public class HadoopClientProtocol implements ClientProtocol { /** {@inheritDoc} */ @Override public Counters getJobCounters(JobID jobId) throws IOException, InterruptedException { try { - final GridHadoopCounters counters = cli.compute().execute(HadoopProtocolJobCountersTask.class.getName(), + final HadoopCounters counters = cli.compute().execute(HadoopProtocolJobCountersTask.class.getName(), new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId())); if (counters == null) throw new IOException("Job tracker doesn't have any information about the job: " + jobId); - return new HadoopCounters(counters); + return new HadoopMapReduceCounters(counters); } catch (GridClientException e) { throw new IOException("Failed to get job counters: " + jobId, e); @@ -312,7 +313,7 @@ public class HadoopClientProtocol implements ClientProtocol { * @param status Ignite status. * @return Hadoop status. */ - private JobStatus processStatus(GridHadoopJobStatus status) { + private JobStatus processStatus(HadoopJobStatus status) { // IMPORTANT! This method will only work in single-threaded environment. It is valid at the moment because // IgniteHadoopClientProtocolProvider creates new instance of this class for every new job and Job class // serializes invocations of submitJob() and getJobStatus() methods. However, if any of these conditions will http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java index 6625d7d..ebdda9f 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java @@ -20,18 +20,19 @@ package org.apache.ignite.internal.processors.hadoop.proto; import org.apache.ignite.*; import org.apache.ignite.compute.*; import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import java.util.*; /** * Task to get job counters. */ -public class HadoopProtocolJobCountersTask extends HadoopProtocolTaskAdapter<GridHadoopCounters> { +public class HadoopProtocolJobCountersTask extends HadoopProtocolTaskAdapter<HadoopCounters> { /** */ private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public GridHadoopCounters run(ComputeJobContext jobCtx, Hadoop hadoop, + @Override public HadoopCounters run(ComputeJobContext jobCtx, Hadoop hadoop, HadoopProtocolTaskArguments args) throws IgniteCheckedException { UUID nodeId = UUID.fromString(args.<String>get(0)); @@ -40,6 +41,6 @@ public class HadoopProtocolJobCountersTask extends HadoopProtocolTaskAdapter<Gri assert nodeId != null; assert id != null; - return hadoop.counters(new GridHadoopJobId(nodeId, id)); + return hadoop.counters(new HadoopJobId(nodeId, id)); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java index 0714eb1..1734562 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java @@ -29,7 +29,7 @@ import java.util.*; /** * Job status task. */ -public class HadoopProtocolJobStatusTask extends HadoopProtocolTaskAdapter<GridHadoopJobStatus> { +public class HadoopProtocolJobStatusTask extends HadoopProtocolTaskAdapter<HadoopJobStatus> { /** */ private static final long serialVersionUID = 0L; @@ -40,7 +40,7 @@ public class HadoopProtocolJobStatusTask extends HadoopProtocolTaskAdapter<GridH private static final String ATTR_HELD = "held"; /** {@inheritDoc} */ - @Override public GridHadoopJobStatus run(final ComputeJobContext jobCtx, Hadoop hadoop, + @Override public HadoopJobStatus run(final ComputeJobContext jobCtx, Hadoop hadoop, HadoopProtocolTaskArguments args) throws IgniteCheckedException { UUID nodeId = UUID.fromString(args.<String>get(0)); Integer id = args.get(1); @@ -49,7 +49,7 @@ public class HadoopProtocolJobStatusTask extends HadoopProtocolTaskAdapter<GridH assert nodeId != null; assert id != null; - GridHadoopJobId jobId = new GridHadoopJobId(nodeId, id); + HadoopJobId jobId = new HadoopJobId(nodeId, id); if (pollDelay == null) pollDelay = DFLT_POLL_DELAY; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java index fc0e484..d173612 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java @@ -39,7 +39,7 @@ public class HadoopProtocolKillJobTask extends HadoopProtocolTaskAdapter<Boolean assert nodeId != null; assert id != null; - GridHadoopJobId jobId = new GridHadoopJobId(nodeId, id); + HadoopJobId jobId = new HadoopJobId(nodeId, id); return hadoop.kill(jobId); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java index e30feb7..2782530 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java @@ -23,12 +23,12 @@ import org.apache.ignite.internal.processors.hadoop.*; /** * Task to get the next job ID. */ -public class HadoopProtocolNextTaskIdTask extends HadoopProtocolTaskAdapter<GridHadoopJobId> { +public class HadoopProtocolNextTaskIdTask extends HadoopProtocolTaskAdapter<HadoopJobId> { /** */ private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public GridHadoopJobId run(ComputeJobContext jobCtx, Hadoop hadoop, + @Override public HadoopJobId run(ComputeJobContext jobCtx, Hadoop hadoop, HadoopProtocolTaskArguments args) { return hadoop.nextJobId(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java index 1da4b58..f65d9bb 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java @@ -23,17 +23,17 @@ import org.apache.ignite.internal.processors.hadoop.*; import java.util.*; -import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobPhase.*; +import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.*; /** * Submit job task. */ -public class HadoopProtocolSubmitJobTask extends HadoopProtocolTaskAdapter<GridHadoopJobStatus> { +public class HadoopProtocolSubmitJobTask extends HadoopProtocolTaskAdapter<HadoopJobStatus> { /** */ private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public GridHadoopJobStatus run(ComputeJobContext jobCtx, Hadoop hadoop, + @Override public HadoopJobStatus run(ComputeJobContext jobCtx, Hadoop hadoop, HadoopProtocolTaskArguments args) throws IgniteCheckedException { UUID nodeId = UUID.fromString(args.<String>get(0)); Integer id = args.get(1); @@ -43,14 +43,14 @@ public class HadoopProtocolSubmitJobTask extends HadoopProtocolTaskAdapter<GridH assert id != null; assert info != null; - GridHadoopJobId jobId = new GridHadoopJobId(nodeId, id); + HadoopJobId jobId = new HadoopJobId(nodeId, id); hadoop.submit(jobId, info); - GridHadoopJobStatus res = hadoop.status(jobId); + HadoopJobStatus res = hadoop.status(jobId); if (res == null) // Submission failed. - res = new GridHadoopJobStatus(jobId, info.jobName(), info.user(), 0, 0, 0, 0, PHASE_CANCELLING, true, 1); + res = new HadoopJobStatus(jobId, info.jobName(), info.user(), 0, 0, 0, 0, PHASE_CANCELLING, true, 1); return res; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java index f3c7837..422d941 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java @@ -37,7 +37,7 @@ import java.util.concurrent.*; */ public class HadoopShuffle extends HadoopComponent { /** */ - private final ConcurrentMap<GridHadoopJobId, HadoopShuffleJob<UUID>> jobs = new ConcurrentHashMap<>(); + private final ConcurrentMap<HadoopJobId, HadoopShuffleJob<UUID>> jobs = new ConcurrentHashMap<>(); /** */ protected final GridUnsafeMemory mem = new GridUnsafeMemory(0); @@ -79,8 +79,8 @@ public class HadoopShuffle extends HadoopComponent { * @return Created shuffle job. * @throws IgniteCheckedException If job creation failed. */ - private HadoopShuffleJob<UUID> newJob(GridHadoopJobId jobId) throws IgniteCheckedException { - GridHadoopMapReducePlan plan = ctx.jobTracker().plan(jobId); + private HadoopShuffleJob<UUID> newJob(HadoopJobId jobId) throws IgniteCheckedException { + HadoopMapReducePlan plan = ctx.jobTracker().plan(jobId); HadoopShuffleJob<UUID> job = new HadoopShuffleJob<>(ctx.localNodeId(), log, ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId())); @@ -117,7 +117,7 @@ public class HadoopShuffle extends HadoopComponent { * @param jobId Task info. * @return Shuffle job. */ - private HadoopShuffleJob<UUID> job(GridHadoopJobId jobId) throws IgniteCheckedException { + private HadoopShuffleJob<UUID> job(HadoopJobId jobId) throws IgniteCheckedException { HadoopShuffleJob<UUID> res = jobs.get(jobId); if (res == null) { @@ -214,7 +214,7 @@ public class HadoopShuffle extends HadoopComponent { /** * @param jobId Job id. */ - public void jobFinished(GridHadoopJobId jobId) { + public void jobFinished(HadoopJobId jobId) { HadoopShuffleJob job = jobs.remove(jobId); if (job != null) { @@ -233,7 +233,7 @@ public class HadoopShuffle extends HadoopComponent { * @param jobId Job ID. * @return Future. */ - public IgniteInternalFuture<?> flush(GridHadoopJobId jobId) { + public IgniteInternalFuture<?> flush(HadoopJobId jobId) { HadoopShuffleJob job = jobs.get(jobId); if (job == null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java index 53ff2d1..49cbd65 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java @@ -37,7 +37,7 @@ public class HadoopShuffleAck implements HadoopMessage { /** */ @GridToStringInclude - private GridHadoopJobId jobId; + private HadoopJobId jobId; /** * @@ -49,7 +49,7 @@ public class HadoopShuffleAck implements HadoopMessage { /** * @param msgId Message ID. */ - public HadoopShuffleAck(long msgId, GridHadoopJobId jobId) { + public HadoopShuffleAck(long msgId, HadoopJobId jobId) { assert jobId != null; this.msgId = msgId; @@ -66,7 +66,7 @@ public class HadoopShuffleAck implements HadoopMessage { /** * @return Job ID. */ - public GridHadoopJobId jobId() { + public HadoopJobId jobId() { return jobId; } @@ -78,7 +78,7 @@ public class HadoopShuffleAck implements HadoopMessage { /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - jobId = new GridHadoopJobId(); + jobId = new HadoopJobId(); jobId.readExternal(in); msgId = in.readLong(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java index 3dab6eb..7ae52df 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java @@ -36,7 +36,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; -import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobProperty.*; +import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.*; import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*; /** @@ -107,7 +107,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable { if (!F.isEmpty(locReducers)) { for (int rdc : locReducers) { - GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(HadoopTaskType.REDUCE, job.id(), rdc, 0, null); + HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.REDUCE, job.id(), rdc, 0, null); reducersCtx.put(rdc, job.getTaskContext(taskInfo)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java index d227e75..c350552a 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java @@ -50,7 +50,7 @@ public class HadoopShuffleMessage implements HadoopMessage { /** */ @GridToStringInclude - private GridHadoopJobId jobId; + private HadoopJobId jobId; /** */ @GridToStringInclude @@ -73,7 +73,7 @@ public class HadoopShuffleMessage implements HadoopMessage { /** * @param size Size. */ - public HadoopShuffleMessage(GridHadoopJobId jobId, int reducer, int size) { + public HadoopShuffleMessage(HadoopJobId jobId, int reducer, int size) { assert jobId != null; buf = new byte[size]; @@ -94,7 +94,7 @@ public class HadoopShuffleMessage implements HadoopMessage { /** * @return Job ID. */ - public GridHadoopJobId jobId() { + public HadoopJobId jobId() { return jobId; } @@ -206,7 +206,7 @@ public class HadoopShuffleMessage implements HadoopMessage { /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - jobId = new GridHadoopJobId(); + jobId = new HadoopJobId(); jobId.readExternal(in); msgId = in.readLong(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java index 82da910..65d9268 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java @@ -56,7 +56,7 @@ public class HadoopConcurrentHashMultimap extends HadoopHashMultimapBase { * @param mem Memory. * @param cap Initial capacity. */ - public HadoopConcurrentHashMultimap(GridHadoopJobInfo jobInfo, GridUnsafeMemory mem, int cap) { + public HadoopConcurrentHashMultimap(HadoopJobInfo jobInfo, GridUnsafeMemory mem, int cap) { super(jobInfo, mem); assert U.isPow2(cap); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java index fcf8e17..f524bdc 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java @@ -37,7 +37,7 @@ public class HadoopHashMultimap extends HadoopHashMultimapBase { * @param mem Memory. * @param cap Initial capacity. */ - public HadoopHashMultimap(GridHadoopJobInfo jobInfo, GridUnsafeMemory mem, int cap) { + public HadoopHashMultimap(HadoopJobInfo jobInfo, GridUnsafeMemory mem, int cap) { super(jobInfo, mem); assert U.isPow2(cap) : cap; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java index c464fd1..16aa673 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java @@ -31,7 +31,7 @@ public abstract class HadoopHashMultimapBase extends HadoopMultimapBase { * @param jobInfo Job info. * @param mem Memory. */ - protected HadoopHashMultimapBase(GridHadoopJobInfo jobInfo, GridUnsafeMemory mem) { + protected HadoopHashMultimapBase(HadoopJobInfo jobInfo, GridUnsafeMemory mem) { super(jobInfo, mem); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java index 5afcbc9..7f332aa 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java @@ -28,7 +28,7 @@ import java.io.*; import java.util.*; import java.util.concurrent.*; -import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobProperty.*; +import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.*; /** * Base class for all multimaps. @@ -47,7 +47,7 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { * @param jobInfo Job info. * @param mem Memory. */ - protected HadoopMultimapBase(GridHadoopJobInfo jobInfo, GridUnsafeMemory mem) { + protected HadoopMultimapBase(HadoopJobInfo jobInfo, GridUnsafeMemory mem) { assert jobInfo != null; assert mem != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java index c7bcda9..69aa7a7 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java @@ -48,7 +48,7 @@ public class HadoopSkipList extends HadoopMultimapBase { * @param jobInfo Job info. * @param mem Memory. */ - public HadoopSkipList(GridHadoopJobInfo jobInfo, GridUnsafeMemory mem) { + public HadoopSkipList(HadoopJobInfo jobInfo, GridUnsafeMemory mem) { super(jobInfo, mem); heads = mem.allocate(HEADS_SIZE, true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java index 9858e12..a3c20d8 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java @@ -35,7 +35,7 @@ public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter { private HadoopJobTracker jobTracker; /** */ - private final ConcurrentMap<GridHadoopJobId, Collection<HadoopRunnableTask>> jobs = new ConcurrentHashMap<>(); + private final ConcurrentMap<HadoopJobId, Collection<HadoopRunnableTask>> jobs = new ConcurrentHashMap<>(); /** Executor service to run tasks. */ private HadoopExecutorService exec; @@ -56,7 +56,7 @@ public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter { exec.shutdown(3000); if (cancel) { - for (GridHadoopJobId jobId : jobs.keySet()) + for (HadoopJobId jobId : jobs.keySet()) cancelTasks(jobId); } } @@ -69,7 +69,7 @@ public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter { } /** {@inheritDoc} */ - @Override public void run(final HadoopJob job, Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException { + @Override public void run(final HadoopJob job, Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException { if (log.isDebugEnabled()) log.debug("Submitting tasks for local execution [locNodeId=" + ctx.localNodeId() + ", tasksCnt=" + tasks.size() + ']'); @@ -86,7 +86,7 @@ public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter { final Collection<HadoopRunnableTask> finalExecutedTasks = executedTasks; - for (final GridHadoopTaskInfo info : tasks) { + for (final HadoopTaskInfo info : tasks) { assert info != null; HadoopRunnableTask task = new HadoopRunnableTask(log, job, ctx.shuffle().memory(), info, @@ -126,7 +126,7 @@ public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter { * * @param jobId Job ID to cancel. */ - @Override public void cancelTasks(GridHadoopJobId jobId) { + @Override public void cancelTasks(HadoopJobId jobId) { Collection<HadoopRunnableTask> executedTasks = jobs.get(jobId); if (executedTasks != null) { @@ -137,7 +137,7 @@ public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter { /** {@inheritDoc} */ @Override public void onJobStateChanged(HadoopJobMetadata meta) throws IgniteCheckedException { - if (meta.phase() == GridHadoopJobPhase.PHASE_COMPLETE) { + if (meta.phase() == HadoopJobPhase.PHASE_COMPLETE) { Collection<HadoopRunnableTask> executedTasks = jobs.remove(meta.jobId()); assert executedTasks == null || executedTasks.isEmpty(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java index d89d7d0..1c318e9 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java @@ -170,7 +170,7 @@ public class HadoopExecutorService { String workerName; if (task instanceof HadoopRunnableTask) { - final GridHadoopTaskInfo i = ((HadoopRunnableTask)task).taskInfo(); + final HadoopTaskInfo i = ((HadoopRunnableTask)task).taskInfo(); workerName = "Hadoop-task-" + i.jobId() + "-" + i.type() + "-" + i.taskNumber() + "-" + i.attempt(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java index 4776321..2b36267 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java @@ -27,7 +27,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; import java.util.*; import java.util.concurrent.*; -import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobProperty.*; +import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.*; import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.*; /** @@ -44,7 +44,7 @@ public abstract class HadoopRunnableTask implements Callable<Void> { private final HadoopJob job; /** Task to run. */ - private final GridHadoopTaskInfo info; + private final HadoopTaskInfo info; /** Submit time. */ private final long submitTs = U.currentTimeMillis(); @@ -74,7 +74,7 @@ public abstract class HadoopRunnableTask implements Callable<Void> { * @param info Task info. * @param nodeId Node id. */ - protected HadoopRunnableTask(IgniteLogger log, HadoopJob job, GridUnsafeMemory mem, GridHadoopTaskInfo info, + protected HadoopRunnableTask(IgniteLogger log, HadoopJob job, GridUnsafeMemory mem, HadoopTaskInfo info, UUID nodeId) { this.nodeId = nodeId; this.log = log.getLogger(HadoopRunnableTask.class); @@ -120,7 +120,7 @@ public abstract class HadoopRunnableTask implements Callable<Void> { runTask(perfCntr); if (info.type() == MAP && job.info().hasCombiner()) { - ctx.taskInfo(new GridHadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(), info.attempt(), null)); + ctx.taskInfo(new HadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(), info.attempt(), null)); try { runTask(perfCntr); @@ -262,7 +262,7 @@ public abstract class HadoopRunnableTask implements Callable<Void> { /** * @return Task info. */ - public GridHadoopTaskInfo taskInfo() { + public HadoopTaskInfo taskInfo() { return info; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java index c2002e6..39b4935 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java @@ -34,7 +34,7 @@ public abstract class HadoopTaskExecutorAdapter extends HadoopComponent { * @param tasks Tasks. * @throws IgniteCheckedException If failed. */ - public abstract void run(final HadoopJob job, Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException; + public abstract void run(final HadoopJob job, Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException; /** * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks @@ -46,7 +46,7 @@ public abstract class HadoopTaskExecutorAdapter extends HadoopComponent { * * @param jobId Job ID to cancel. */ - public abstract void cancelTasks(GridHadoopJobId jobId) throws IgniteCheckedException; + public abstract void cancelTasks(HadoopJobId jobId) throws IgniteCheckedException; /** * On job state change callback; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java index 490f0b2..c5ee16c 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.processors.hadoop.taskexecutor; -import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -37,7 +37,7 @@ public class HadoopTaskStatus implements Externalizable { private Throwable failCause; /** */ - private GridHadoopCounters cntrs; + private HadoopCounters cntrs; /** * Default constructor required by {@link Externalizable}. @@ -64,7 +64,7 @@ public class HadoopTaskStatus implements Externalizable { * @param cntrs Task counters. */ public HadoopTaskStatus(HadoopTaskState state, @Nullable Throwable failCause, - @Nullable GridHadoopCounters cntrs) { + @Nullable HadoopCounters cntrs) { assert state != null; this.state = state; @@ -89,7 +89,7 @@ public class HadoopTaskStatus implements Externalizable { /** * @return Counters. */ - @Nullable public GridHadoopCounters counters() { + @Nullable public HadoopCounters counters() { return cntrs; } @@ -109,6 +109,6 @@ public class HadoopTaskStatus implements Externalizable { @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { state = (HadoopTaskState)in.readObject(); failCause = (Throwable)in.readObject(); - cntrs = (GridHadoopCounters)in.readObject(); + cntrs = (HadoopCounters)in.readObject(); } }