http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java new file mode 100644 index 0000000..91a2d6f --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java @@ -0,0 +1,1625 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.jobtracker; + +import org.apache.ignite.*; +import org.apache.ignite.events.*; +import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import javax.cache.event.*; +import javax.cache.expiry.*; +import javax.cache.processor.*; +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobPhase.*; +import static org.apache.ignite.internal.processors.hadoop.GridHadoopTaskType.*; +import static org.apache.ignite.internal.processors.hadoop.taskexecutor.GridHadoopTaskState.*; + +/** + * Hadoop job tracker. + */ +public class HadoopJobTracker extends HadoopComponent { + /** */ + private final GridMutex mux = new GridMutex(); + + /** */ + private volatile GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> jobMetaPrj; + + /** Projection with expiry policy for finished job updates. */ + private volatile GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> finishedJobMetaPrj; + + /** Map-reduce execution planner. */ + @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") + private GridHadoopMapReducePlanner mrPlanner; + + /** All the known jobs. */ + private final ConcurrentMap<GridHadoopJobId, GridFutureAdapterEx<GridHadoopJob>> jobs = new ConcurrentHashMap8<>(); + + /** Locally active jobs. */ + private final ConcurrentMap<GridHadoopJobId, JobLocalState> activeJobs = new ConcurrentHashMap8<>(); + + /** Locally requested finish futures. */ + private final ConcurrentMap<GridHadoopJobId, GridFutureAdapter<GridHadoopJobId>> activeFinishFuts = + new ConcurrentHashMap8<>(); + + /** Event processing service. */ + private ExecutorService evtProcSvc; + + /** Component busy lock. */ + private GridSpinReadWriteLock busyLock; + + /** Closure to check result of async transform of system cache. */ + private final IgniteInClosure<IgniteInternalFuture<?>> failsLog = new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> gridFut) { + try { + gridFut.get(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to transform system cache.", e); + } + } + }; + + /** {@inheritDoc} */ + @Override public void start(HadoopContext ctx) throws IgniteCheckedException { + super.start(ctx); + + busyLock = new GridSpinReadWriteLock(); + + evtProcSvc = Executors.newFixedThreadPool(1); + } + + /** + * @return Job meta projection. + */ + @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") + private GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> jobMetaCache() { + GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> prj = jobMetaPrj; + + if (prj == null) { + synchronized (mux) { + if ((prj = jobMetaPrj) == null) { + CacheProjection<Object, Object> sysCache = ctx.kernalContext().cache() + .cache(CU.SYS_CACHE_HADOOP_MR); + + assert sysCache != null; + + mrPlanner = ctx.planner(); + + try { + ctx.kernalContext().resource().injectGeneric(mrPlanner); + } + catch (IgniteCheckedException e) { // Must not happen. + U.error(log, "Failed to inject resources.", e); + + throw new IllegalStateException(e); + } + + jobMetaPrj = prj = (GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata>) + sysCache.projection(GridHadoopJobId.class, GridHadoopJobMetadata.class); + + if (ctx.configuration().getFinishedJobInfoTtl() > 0) { + ExpiryPolicy finishedJobPlc = new ModifiedExpiryPolicy( + new Duration(MILLISECONDS, ctx.configuration().getFinishedJobInfoTtl())); + + finishedJobMetaPrj = prj.withExpiryPolicy(finishedJobPlc); + } + else + finishedJobMetaPrj = jobMetaPrj; + } + } + } + + return prj; + } + + /** + * @return Projection with expiry policy for finished job updates. + */ + private GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> finishedJobMetaCache() { + GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> prj = finishedJobMetaPrj; + + if (prj == null) { + jobMetaCache(); + + prj = finishedJobMetaPrj; + + assert prj != null; + } + + return prj; + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public void onKernalStart() throws IgniteCheckedException { + super.onKernalStart(); + + jobMetaCache().context().continuousQueries().executeInternalQuery( + new CacheEntryUpdatedListener<GridHadoopJobId, GridHadoopJobMetadata>() { + @Override public void onUpdated(final Iterable<CacheEntryEvent<? extends GridHadoopJobId, + ? extends GridHadoopJobMetadata>> evts) { + if (!busyLock.tryReadLock()) + return; + + try { + // Must process query callback in a separate thread to avoid deadlocks. + evtProcSvc.submit(new EventHandler() { + @Override protected void body() throws IgniteCheckedException { + processJobMetadataUpdates(evts); + } + }); + } + finally { + busyLock.readUnlock(); + } + } + }, + null, + true, + true + ); + + ctx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() { + @Override public void onEvent(final Event evt) { + if (!busyLock.tryReadLock()) + return; + + try { + // Must process discovery callback in a separate thread to avoid deadlock. + evtProcSvc.submit(new EventHandler() { + @Override protected void body() { + processNodeLeft((DiscoveryEvent)evt); + } + }); + } + finally { + busyLock.readUnlock(); + } + } + }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT); + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + super.onKernalStop(cancel); + + busyLock.writeLock(); + + evtProcSvc.shutdown(); + + // Fail all pending futures. + for (GridFutureAdapter<GridHadoopJobId> fut : activeFinishFuts.values()) + fut.onDone(new IgniteCheckedException("Failed to execute Hadoop map-reduce job (grid is stopping).")); + } + + /** + * Submits execution of Hadoop job to grid. + * + * @param jobId Job ID. + * @param info Job info. + * @return Job completion future. + */ + @SuppressWarnings("unchecked") + public IgniteInternalFuture<GridHadoopJobId> submit(GridHadoopJobId jobId, GridHadoopJobInfo info) { + if (!busyLock.tryReadLock()) { + return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to execute map-reduce job " + + "(grid is stopping): " + info)); + } + + try { + long jobPrepare = U.currentTimeMillis(); + + if (jobs.containsKey(jobId) || jobMetaCache().containsKey(jobId)) + throw new IgniteCheckedException("Failed to submit job. Job with the same ID already exists: " + jobId); + + GridHadoopJob job = job(jobId, info); + + GridHadoopMapReducePlan mrPlan = mrPlanner.preparePlan(job, ctx.nodes(), null); + + GridHadoopJobMetadata meta = new GridHadoopJobMetadata(ctx.localNodeId(), jobId, info); + + meta.mapReducePlan(mrPlan); + + meta.pendingSplits(allSplits(mrPlan)); + meta.pendingReducers(allReducers(mrPlan)); + + GridFutureAdapter<GridHadoopJobId> completeFut = new GridFutureAdapter<>(); + + GridFutureAdapter<GridHadoopJobId> old = activeFinishFuts.put(jobId, completeFut); + + assert old == null : "Duplicate completion future [jobId=" + jobId + ", old=" + old + ']'; + + if (log.isDebugEnabled()) + log.debug("Submitting job metadata [jobId=" + jobId + ", meta=" + meta + ']'); + + long jobStart = U.currentTimeMillis(); + + GridHadoopPerformanceCounter perfCntr = GridHadoopPerformanceCounter.getCounter(meta.counters(), + ctx.localNodeId()); + + perfCntr.clientSubmissionEvents(info); + perfCntr.onJobPrepare(jobPrepare); + perfCntr.onJobStart(jobStart); + + if (jobMetaCache().putIfAbsent(jobId, meta) != null) + throw new IgniteCheckedException("Failed to submit job. Job with the same ID already exists: " + jobId); + + return completeFut; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to submit job: " + jobId, e); + + return new GridFinishedFutureEx<>(e); + } + finally { + busyLock.readUnlock(); + } + } + + /** + * Convert Hadoop job metadata to job status. + * + * @param meta Metadata. + * @return Status. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + public static GridHadoopJobStatus status(GridHadoopJobMetadata meta) { + GridHadoopJobInfo jobInfo = meta.jobInfo(); + + return new GridHadoopJobStatus( + meta.jobId(), + jobInfo.jobName(), + jobInfo.user(), + meta.pendingSplits() != null ? meta.pendingSplits().size() : 0, + meta.pendingReducers() != null ? meta.pendingReducers().size() : 0, + meta.mapReducePlan().mappers(), + meta.mapReducePlan().reducers(), + meta.phase(), + meta.failCause() != null, + meta.version() + ); + } + + /** + * Gets hadoop job status for given job ID. + * + * @param jobId Job ID to get status for. + * @return Job status for given job ID or {@code null} if job was not found. + */ + @Nullable public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException { + if (!busyLock.tryReadLock()) + return null; // Grid is stopping. + + try { + GridHadoopJobMetadata meta = jobMetaCache().get(jobId); + + return meta != null ? status(meta) : null; + } + finally { + busyLock.readUnlock(); + } + } + + /** + * Gets job finish future. + * + * @param jobId Job ID. + * @return Finish future or {@code null}. + * @throws IgniteCheckedException If failed. + */ + @Nullable public IgniteInternalFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException { + if (!busyLock.tryReadLock()) + return null; // Grid is stopping. + + try { + GridHadoopJobMetadata meta = jobMetaCache().get(jobId); + + if (meta == null) + return null; + + if (log.isTraceEnabled()) + log.trace("Got job metadata for status check [locNodeId=" + ctx.localNodeId() + ", meta=" + meta + ']'); + + if (meta.phase() == PHASE_COMPLETE) { + if (log.isTraceEnabled()) + log.trace("Job is complete, returning finished future: " + jobId); + + return new GridFinishedFutureEx<>(jobId, meta.failCause()); + } + + GridFutureAdapter<GridHadoopJobId> fut = F.addIfAbsent(activeFinishFuts, jobId, + new GridFutureAdapter<GridHadoopJobId>()); + + // Get meta from cache one more time to close the window. + meta = jobMetaCache().get(jobId); + + if (log.isTraceEnabled()) + log.trace("Re-checking job metadata [locNodeId=" + ctx.localNodeId() + ", meta=" + meta + ']'); + + if (meta == null) { + fut.onDone(); + + activeFinishFuts.remove(jobId , fut); + } + else if (meta.phase() == PHASE_COMPLETE) { + fut.onDone(jobId, meta.failCause()); + + activeFinishFuts.remove(jobId , fut); + } + + return fut; + } + finally { + busyLock.readUnlock(); + } + } + + /** + * Gets job plan by job ID. + * + * @param jobId Job ID. + * @return Job plan. + * @throws IgniteCheckedException If failed. + */ + public GridHadoopMapReducePlan plan(GridHadoopJobId jobId) throws IgniteCheckedException { + if (!busyLock.tryReadLock()) + return null; + + try { + GridHadoopJobMetadata meta = jobMetaCache().get(jobId); + + if (meta != null) + return meta.mapReducePlan(); + + return null; + } + finally { + busyLock.readUnlock(); + } + } + + /** + * Callback from task executor invoked when a task has been finished. + * + * @param info Task info. + * @param status Task status. + */ + @SuppressWarnings({"ConstantConditions", "ThrowableResultOfMethodCallIgnored"}) + public void onTaskFinished(GridHadoopTaskInfo info, GridHadoopTaskStatus status) { + if (!busyLock.tryReadLock()) + return; + + try { + assert status.state() != RUNNING; + + if (log.isDebugEnabled()) + log.debug("Received task finished callback [info=" + info + ", status=" + status + ']'); + + JobLocalState state = activeJobs.get(info.jobId()); + + // Task CRASHes with null fail cause. + assert (status.state() != FAILED) || status.failCause() != null : + "Invalid task status [info=" + info + ", status=" + status + ']'; + + assert state != null || (ctx.jobUpdateLeader() && (info.type() == COMMIT || info.type() == ABORT)): + "Missing local state for finished task [info=" + info + ", status=" + status + ']'; + + StackedProcessor incrCntrs = null; + + if (status.state() == COMPLETED) + incrCntrs = new IncrementCountersProcessor(null, status.counters()); + + switch (info.type()) { + case SETUP: { + state.onSetupFinished(info, status, incrCntrs); + + break; + } + + case MAP: { + state.onMapFinished(info, status, incrCntrs); + + break; + } + + case REDUCE: { + state.onReduceFinished(info, status, incrCntrs); + + break; + } + + case COMBINE: { + state.onCombineFinished(info, status, incrCntrs); + + break; + } + + case COMMIT: + case ABORT: { + GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> cache = finishedJobMetaCache(); + + cache.invokeAsync(info.jobId(), new UpdatePhaseProcessor(incrCntrs, PHASE_COMPLETE)). + listenAsync(failsLog); + + break; + } + } + } + finally { + busyLock.readUnlock(); + } + } + + /** + * @param jobId Job id. + * @param c Closure of operation. + */ + private void transform(GridHadoopJobId jobId, EntryProcessor<GridHadoopJobId, GridHadoopJobMetadata, Void> c) { + jobMetaCache().invokeAsync(jobId, c).listenAsync(failsLog); + } + + /** + * Callback from task executor called when process is ready to received shuffle messages. + * + * @param jobId Job ID. + * @param reducers Reducers. + * @param desc Process descriptor. + */ + public void onExternalMappersInitialized(GridHadoopJobId jobId, Collection<Integer> reducers, + GridHadoopProcessDescriptor desc) { + transform(jobId, new InitializeReducersProcessor(null, reducers, desc)); + } + + /** + * Gets all input splits for given hadoop map-reduce plan. + * + * @param plan Map-reduce plan. + * @return Collection of all input splits that should be processed. + */ + @SuppressWarnings("ConstantConditions") + private Map<GridHadoopInputSplit, Integer> allSplits(GridHadoopMapReducePlan plan) { + Map<GridHadoopInputSplit, Integer> res = new HashMap<>(); + + int taskNum = 0; + + for (UUID nodeId : plan.mapperNodeIds()) { + for (GridHadoopInputSplit split : plan.mappers(nodeId)) { + if (res.put(split, taskNum++) != null) + throw new IllegalStateException("Split duplicate."); + } + } + + return res; + } + + /** + * Gets all reducers for this job. + * + * @param plan Map-reduce plan. + * @return Collection of reducers. + */ + private Collection<Integer> allReducers(GridHadoopMapReducePlan plan) { + Collection<Integer> res = new HashSet<>(); + + for (int i = 0; i < plan.reducers(); i++) + res.add(i); + + return res; + } + + /** + * Processes node leave (or fail) event. + * + * @param evt Discovery event. + */ + @SuppressWarnings("ConstantConditions") + private void processNodeLeft(DiscoveryEvent evt) { + if (log.isDebugEnabled()) + log.debug("Processing discovery event [locNodeId=" + ctx.localNodeId() + ", evt=" + evt + ']'); + + // Check only if this node is responsible for job status updates. + if (ctx.jobUpdateLeader()) { + boolean checkSetup = evt.eventNode().order() < ctx.localNodeOrder(); + + // Iteration over all local entries is correct since system cache is REPLICATED. + for (Object metaObj : jobMetaCache().values()) { + GridHadoopJobMetadata meta = (GridHadoopJobMetadata)metaObj; + + GridHadoopJobId jobId = meta.jobId(); + + GridHadoopMapReducePlan plan = meta.mapReducePlan(); + + GridHadoopJobPhase phase = meta.phase(); + + try { + if (checkSetup && phase == PHASE_SETUP && !activeJobs.containsKey(jobId)) { + // Failover setup task. + GridHadoopJob job = job(jobId, meta.jobInfo()); + + Collection<GridHadoopTaskInfo> setupTask = setupTask(jobId); + + assert setupTask != null; + + ctx.taskExecutor().run(job, setupTask); + } + else if (phase == PHASE_MAP || phase == PHASE_REDUCE) { + // Must check all nodes, even that are not event node ID due to + // multiple node failure possibility. + Collection<GridHadoopInputSplit> cancelSplits = null; + + for (UUID nodeId : plan.mapperNodeIds()) { + if (ctx.kernalContext().discovery().node(nodeId) == null) { + // Node has left the grid. + Collection<GridHadoopInputSplit> mappers = plan.mappers(nodeId); + + if (cancelSplits == null) + cancelSplits = new HashSet<>(); + + cancelSplits.addAll(mappers); + } + } + + Collection<Integer> cancelReducers = null; + + for (UUID nodeId : plan.reducerNodeIds()) { + if (ctx.kernalContext().discovery().node(nodeId) == null) { + // Node has left the grid. + int[] reducers = plan.reducers(nodeId); + + if (cancelReducers == null) + cancelReducers = new HashSet<>(); + + for (int rdc : reducers) + cancelReducers.add(rdc); + } + } + + if (cancelSplits != null || cancelReducers != null) + jobMetaCache().invoke(meta.jobId(), new CancelJobProcessor(null, new IgniteCheckedException( + "One or more nodes participating in map-reduce job execution failed."), cancelSplits, + cancelReducers)); + } + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to cancel job: " + meta, e); + } + } + } + } + + /** + * @param updated Updated cache entries. + * @throws IgniteCheckedException If failed. + */ + private void processJobMetadataUpdates( + Iterable<CacheEntryEvent<? extends GridHadoopJobId, ? extends GridHadoopJobMetadata>> updated) + throws IgniteCheckedException { + UUID locNodeId = ctx.localNodeId(); + + for (CacheEntryEvent<? extends GridHadoopJobId, ? extends GridHadoopJobMetadata> entry : updated) { + GridHadoopJobId jobId = entry.getKey(); + GridHadoopJobMetadata meta = entry.getValue(); + + if (meta == null || !ctx.isParticipating(meta)) + continue; + + if (log.isDebugEnabled()) + log.debug("Processing job metadata update callback [locNodeId=" + locNodeId + + ", meta=" + meta + ']'); + + try { + ctx.taskExecutor().onJobStateChanged(meta); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to process job state changed callback (will fail the job) " + + "[locNodeId=" + locNodeId + ", jobId=" + jobId + ", meta=" + meta + ']', e); + + transform(jobId, new CancelJobProcessor(null, e)); + + continue; + } + + processJobMetaUpdate(jobId, meta, locNodeId); + } + } + + /** + * @param jobId Job ID. + * @param plan Map-reduce plan. + */ + private void printPlan(GridHadoopJobId jobId, GridHadoopMapReducePlan plan) { + log.info("Plan for " + jobId); + + SB b = new SB(); + + b.a(" Map: "); + + for (UUID nodeId : plan.mapperNodeIds()) + b.a(nodeId).a("=").a(plan.mappers(nodeId).size()).a(' '); + + log.info(b.toString()); + + b = new SB(); + + b.a(" Reduce: "); + + for (UUID nodeId : plan.reducerNodeIds()) + b.a(nodeId).a("=").a(Arrays.toString(plan.reducers(nodeId))).a(' '); + + log.info(b.toString()); + } + + /** + * @param jobId Job ID. + * @param meta Job metadata. + * @param locNodeId Local node ID. + * @throws IgniteCheckedException If failed. + */ + private void processJobMetaUpdate(GridHadoopJobId jobId, GridHadoopJobMetadata meta, UUID locNodeId) + throws IgniteCheckedException { + JobLocalState state = activeJobs.get(jobId); + + GridHadoopJob job = job(jobId, meta.jobInfo()); + + GridHadoopMapReducePlan plan = meta.mapReducePlan(); + + switch (meta.phase()) { + case PHASE_SETUP: { + if (ctx.jobUpdateLeader()) { + Collection<GridHadoopTaskInfo> setupTask = setupTask(jobId); + + if (setupTask != null) + ctx.taskExecutor().run(job, setupTask); + } + + break; + } + + case PHASE_MAP: { + // Check if we should initiate new task on local node. + Collection<GridHadoopTaskInfo> tasks = mapperTasks(plan.mappers(locNodeId), meta); + + if (tasks != null) + ctx.taskExecutor().run(job, tasks); + + break; + } + + case PHASE_REDUCE: { + if (meta.pendingReducers().isEmpty() && ctx.jobUpdateLeader()) { + GridHadoopTaskInfo info = new GridHadoopTaskInfo(COMMIT, jobId, 0, 0, null); + + if (log.isDebugEnabled()) + log.debug("Submitting COMMIT task for execution [locNodeId=" + locNodeId + + ", jobId=" + jobId + ']'); + + ctx.taskExecutor().run(job, Collections.singletonList(info)); + + break; + } + + Collection<GridHadoopTaskInfo> tasks = reducerTasks(plan.reducers(locNodeId), job); + + if (tasks != null) + ctx.taskExecutor().run(job, tasks); + + break; + } + + case PHASE_CANCELLING: { + // Prevent multiple task executor notification. + if (state != null && state.onCancel()) { + if (log.isDebugEnabled()) + log.debug("Cancelling local task execution for job: " + meta); + + ctx.taskExecutor().cancelTasks(jobId); + } + + if (meta.pendingSplits().isEmpty() && meta.pendingReducers().isEmpty()) { + if (ctx.jobUpdateLeader()) { + if (state == null) + state = initState(jobId); + + // Prevent running multiple abort tasks. + if (state.onAborted()) { + GridHadoopTaskInfo info = new GridHadoopTaskInfo(ABORT, jobId, 0, 0, null); + + if (log.isDebugEnabled()) + log.debug("Submitting ABORT task for execution [locNodeId=" + locNodeId + + ", jobId=" + jobId + ']'); + + ctx.taskExecutor().run(job, Collections.singletonList(info)); + } + } + + break; + } + else { + // Check if there are unscheduled mappers or reducers. + Collection<GridHadoopInputSplit> cancelMappers = new ArrayList<>(); + Collection<Integer> cancelReducers = new ArrayList<>(); + + Collection<GridHadoopInputSplit> mappers = plan.mappers(ctx.localNodeId()); + + if (mappers != null) { + for (GridHadoopInputSplit b : mappers) { + if (state == null || !state.mapperScheduled(b)) + cancelMappers.add(b); + } + } + + int[] rdc = plan.reducers(ctx.localNodeId()); + + if (rdc != null) { + for (int r : rdc) { + if (state == null || !state.reducerScheduled(r)) + cancelReducers.add(r); + } + } + + if (!cancelMappers.isEmpty() || !cancelReducers.isEmpty()) + transform(jobId, new CancelJobProcessor(null, cancelMappers, cancelReducers)); + } + + break; + } + + case PHASE_COMPLETE: { + if (log.isDebugEnabled()) + log.debug("Job execution is complete, will remove local state from active jobs " + + "[jobId=" + jobId + ", meta=" + meta + ']'); + + if (state != null) { + state = activeJobs.remove(jobId); + + assert state != null; + + ctx.shuffle().jobFinished(jobId); + } + + GridFutureAdapter<GridHadoopJobId> finishFut = activeFinishFuts.remove(jobId); + + if (finishFut != null) { + if (log.isDebugEnabled()) + log.debug("Completing job future [locNodeId=" + locNodeId + ", meta=" + meta + ']'); + + finishFut.onDone(jobId, meta.failCause()); + } + + if (ctx.jobUpdateLeader()) + job.cleanupStagingDirectory(); + + jobs.remove(jobId); + + job.dispose(false); + + if (ctx.jobUpdateLeader()) { + ClassLoader ldr = job.getClass().getClassLoader(); + + try { + String statWriterClsName = job.info().property(HadoopUtils.JOB_COUNTER_WRITER_PROPERTY); + + if (statWriterClsName != null) { + Class<?> cls = ldr.loadClass(statWriterClsName); + + GridHadoopCounterWriter writer = (GridHadoopCounterWriter)cls.newInstance(); + + GridHadoopCounters cntrs = meta.counters(); + + writer.write(job.info(), jobId, cntrs); + } + } + catch (Exception e) { + log.error("Can't write statistic due to: ", e); + } + } + + break; + } + + default: + throw new IllegalStateException("Unknown phase: " + meta.phase()); + } + } + + /** + * Creates setup task based on job information. + * + * @param jobId Job ID. + * @return Setup task wrapped in collection. + */ + @Nullable private Collection<GridHadoopTaskInfo> setupTask(GridHadoopJobId jobId) { + if (activeJobs.containsKey(jobId)) + return null; + else { + initState(jobId); + + return Collections.singleton(new GridHadoopTaskInfo(SETUP, jobId, 0, 0, null)); + } + } + + /** + * Creates mapper tasks based on job information. + * + * @param mappers Mapper blocks. + * @param meta Job metadata. + * @return Collection of created task infos or {@code null} if no mapper tasks scheduled for local node. + */ + private Collection<GridHadoopTaskInfo> mapperTasks(Iterable<GridHadoopInputSplit> mappers, GridHadoopJobMetadata meta) { + UUID locNodeId = ctx.localNodeId(); + GridHadoopJobId jobId = meta.jobId(); + + JobLocalState state = activeJobs.get(jobId); + + Collection<GridHadoopTaskInfo> tasks = null; + + if (mappers != null) { + if (state == null) + state = initState(jobId); + + for (GridHadoopInputSplit split : mappers) { + if (state.addMapper(split)) { + if (log.isDebugEnabled()) + log.debug("Submitting MAP task for execution [locNodeId=" + locNodeId + + ", split=" + split + ']'); + + GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(MAP, jobId, meta.taskNumber(split), 0, split); + + if (tasks == null) + tasks = new ArrayList<>(); + + tasks.add(taskInfo); + } + } + } + + return tasks; + } + + /** + * Creates reducer tasks based on job information. + * + * @param reducers Reducers (may be {@code null}). + * @param job Job instance. + * @return Collection of task infos. + */ + private Collection<GridHadoopTaskInfo> reducerTasks(int[] reducers, GridHadoopJob job) { + UUID locNodeId = ctx.localNodeId(); + GridHadoopJobId jobId = job.id(); + + JobLocalState state = activeJobs.get(jobId); + + Collection<GridHadoopTaskInfo> tasks = null; + + if (reducers != null) { + if (state == null) + state = initState(job.id()); + + for (int rdc : reducers) { + if (state.addReducer(rdc)) { + if (log.isDebugEnabled()) + log.debug("Submitting REDUCE task for execution [locNodeId=" + locNodeId + + ", rdc=" + rdc + ']'); + + GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(REDUCE, jobId, rdc, 0, null); + + if (tasks == null) + tasks = new ArrayList<>(); + + tasks.add(taskInfo); + } + } + } + + return tasks; + } + + /** + * Initializes local state for given job metadata. + * + * @param jobId Job ID. + * @return Local state. + */ + private JobLocalState initState(GridHadoopJobId jobId) { + return F.addIfAbsent(activeJobs, jobId, new JobLocalState()); + } + + /** + * Gets or creates job instance. + * + * @param jobId Job ID. + * @param jobInfo Job info. + * @return Job. + * @throws IgniteCheckedException If failed. + */ + @Nullable public GridHadoopJob job(GridHadoopJobId jobId, @Nullable GridHadoopJobInfo jobInfo) throws IgniteCheckedException { + GridFutureAdapterEx<GridHadoopJob> fut = jobs.get(jobId); + + if (fut != null || (fut = jobs.putIfAbsent(jobId, new GridFutureAdapterEx<GridHadoopJob>())) != null) + return fut.get(); + + fut = jobs.get(jobId); + + GridHadoopJob job = null; + + try { + if (jobInfo == null) { + GridHadoopJobMetadata meta = jobMetaCache().get(jobId); + + if (meta == null) + throw new IgniteCheckedException("Failed to find job metadata for ID: " + jobId); + + jobInfo = meta.jobInfo(); + } + + job = jobInfo.createJob(jobId, log); + + job.initialize(false, ctx.localNodeId()); + + fut.onDone(job); + + return job; + } + catch (IgniteCheckedException e) { + fut.onDone(e); + + jobs.remove(jobId, fut); + + if (job != null) { + try { + job.dispose(false); + } + catch (IgniteCheckedException e0) { + U.error(log, "Failed to dispose job: " + jobId, e0); + } + } + + throw e; + } + } + + /** + * Kills job. + * + * @param jobId Job ID. + * @return {@code True} if job was killed. + * @throws IgniteCheckedException If failed. + */ + public boolean killJob(GridHadoopJobId jobId) throws IgniteCheckedException { + if (!busyLock.tryReadLock()) + return false; // Grid is stopping. + + try { + GridHadoopJobMetadata meta = jobMetaCache().get(jobId); + + if (meta != null && meta.phase() != PHASE_COMPLETE && meta.phase() != PHASE_CANCELLING) { + HadoopTaskCancelledException err = new HadoopTaskCancelledException("Job cancelled."); + + jobMetaCache().invoke(jobId, new CancelJobProcessor(null, err)); + } + } + finally { + busyLock.readUnlock(); + } + + IgniteInternalFuture<?> fut = finishFuture(jobId); + + if (fut != null) { + try { + fut.get(); + } + catch (Throwable e) { + if (e.getCause() instanceof HadoopTaskCancelledException) + return true; + } + } + + return false; + } + + /** + * Returns job counters. + * + * @param jobId Job identifier. + * @return Job counters or {@code null} if job cannot be found. + * @throws IgniteCheckedException If failed. + */ + @Nullable public GridHadoopCounters jobCounters(GridHadoopJobId jobId) throws IgniteCheckedException { + if (!busyLock.tryReadLock()) + return null; + + try { + final GridHadoopJobMetadata meta = jobMetaCache().get(jobId); + + return meta != null ? meta.counters() : null; + } + finally { + busyLock.readUnlock(); + } + } + + /** + * Event handler protected by busy lock. + */ + private abstract class EventHandler implements Runnable { + /** {@inheritDoc} */ + @Override public void run() { + if (!busyLock.tryReadLock()) + return; + + try { + body(); + } + catch (Throwable e) { + U.error(log, "Unhandled exception while processing event.", e); + } + finally { + busyLock.readUnlock(); + } + } + + /** + * Handler body. + */ + protected abstract void body() throws Exception; + } + + /** + * + */ + private class JobLocalState { + /** Mappers. */ + private final Collection<GridHadoopInputSplit> currMappers = new HashSet<>(); + + /** Reducers. */ + private final Collection<Integer> currReducers = new HashSet<>(); + + /** Number of completed mappers. */ + private final AtomicInteger completedMappersCnt = new AtomicInteger(); + + /** Cancelled flag. */ + private boolean cancelled; + + /** Aborted flag. */ + private boolean aborted; + + /** + * @param mapSplit Map split to add. + * @return {@code True} if mapper was added. + */ + private boolean addMapper(GridHadoopInputSplit mapSplit) { + return currMappers.add(mapSplit); + } + + /** + * @param rdc Reducer number to add. + * @return {@code True} if reducer was added. + */ + private boolean addReducer(int rdc) { + return currReducers.add(rdc); + } + + /** + * Checks whether this split was scheduled for given attempt. + * + * @param mapSplit Map split to check. + * @return {@code True} if mapper was scheduled. + */ + public boolean mapperScheduled(GridHadoopInputSplit mapSplit) { + return currMappers.contains(mapSplit); + } + + /** + * Checks whether this split was scheduled for given attempt. + * + * @param rdc Reducer number to check. + * @return {@code True} if reducer was scheduled. + */ + public boolean reducerScheduled(int rdc) { + return currReducers.contains(rdc); + } + + /** + * @param taskInfo Task info. + * @param status Task status. + * @param prev Previous closure. + */ + private void onSetupFinished(final GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, StackedProcessor prev) { + final GridHadoopJobId jobId = taskInfo.jobId(); + + if (status.state() == FAILED || status.state() == CRASHED) + transform(jobId, new CancelJobProcessor(prev, status.failCause())); + else + transform(jobId, new UpdatePhaseProcessor(prev, PHASE_MAP)); + } + + /** + * @param taskInfo Task info. + * @param status Task status. + * @param prev Previous closure. + */ + private void onMapFinished(final GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, + final StackedProcessor prev) { + final GridHadoopJobId jobId = taskInfo.jobId(); + + boolean lastMapperFinished = completedMappersCnt.incrementAndGet() == currMappers.size(); + + if (status.state() == FAILED || status.state() == CRASHED) { + // Fail the whole job. + transform(jobId, new RemoveMappersProcessor(prev, taskInfo.inputSplit(), status.failCause())); + + return; + } + + IgniteInClosure<IgniteInternalFuture<?>> cacheUpdater = new CIX1<IgniteInternalFuture<?>>() { + @Override public void applyx(IgniteInternalFuture<?> f) { + Throwable err = null; + + if (f != null) { + try { + f.get(); + } + catch (IgniteCheckedException e) { + err = e; + } + } + + transform(jobId, new RemoveMappersProcessor(prev, taskInfo.inputSplit(), err)); + } + }; + + if (lastMapperFinished) + ctx.shuffle().flush(jobId).listenAsync(cacheUpdater); + else + cacheUpdater.apply(null); + } + + /** + * @param taskInfo Task info. + * @param status Task status. + * @param prev Previous closure. + */ + private void onReduceFinished(GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, StackedProcessor prev) { + GridHadoopJobId jobId = taskInfo.jobId(); + if (status.state() == FAILED || status.state() == CRASHED) + // Fail the whole job. + transform(jobId, new RemoveReducerProcessor(prev, taskInfo.taskNumber(), status.failCause())); + else + transform(jobId, new RemoveReducerProcessor(prev, taskInfo.taskNumber())); + } + + /** + * @param taskInfo Task info. + * @param status Task status. + * @param prev Previous closure. + */ + private void onCombineFinished(GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, + final StackedProcessor prev) { + final GridHadoopJobId jobId = taskInfo.jobId(); + + if (status.state() == FAILED || status.state() == CRASHED) + // Fail the whole job. + transform(jobId, new RemoveMappersProcessor(prev, currMappers, status.failCause())); + else { + ctx.shuffle().flush(jobId).listenAsync(new CIX1<IgniteInternalFuture<?>>() { + @Override public void applyx(IgniteInternalFuture<?> f) { + Throwable err = null; + + if (f != null) { + try { + f.get(); + } + catch (IgniteCheckedException e) { + err = e; + } + } + + transform(jobId, new RemoveMappersProcessor(prev, currMappers, err)); + } + }); + } + } + + /** + * @return {@code True} if job was cancelled by this (first) call. + */ + public boolean onCancel() { + if (!cancelled && !aborted) { + cancelled = true; + + return true; + } + + return false; + } + + /** + * @return {@code True} if job was aborted this (first) call. + */ + public boolean onAborted() { + if (!aborted) { + aborted = true; + + return true; + } + + return false; + } + } + + /** + * Update job phase transform closure. + */ + private static class UpdatePhaseProcessor extends StackedProcessor { + /** */ + private static final long serialVersionUID = 0L; + + /** Phase to update. */ + private final GridHadoopJobPhase phase; + + /** + * @param prev Previous closure. + * @param phase Phase to update. + */ + private UpdatePhaseProcessor(@Nullable StackedProcessor prev, GridHadoopJobPhase phase) { + super(prev); + + this.phase = phase; + } + + /** {@inheritDoc} */ + @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) { + cp.phase(phase); + } + } + + /** + * Remove mapper transform closure. + */ + private static class RemoveMappersProcessor extends StackedProcessor { + /** */ + private static final long serialVersionUID = 0L; + + /** Mapper split to remove. */ + private final Collection<GridHadoopInputSplit> splits; + + /** Error. */ + private final Throwable err; + + /** + * @param prev Previous closure. + * @param split Mapper split to remove. + * @param err Error. + */ + private RemoveMappersProcessor(@Nullable StackedProcessor prev, GridHadoopInputSplit split, Throwable err) { + this(prev, Collections.singletonList(split), err); + } + + /** + * @param prev Previous closure. + * @param splits Mapper splits to remove. + * @param err Error. + */ + private RemoveMappersProcessor(@Nullable StackedProcessor prev, Collection<GridHadoopInputSplit> splits, + Throwable err) { + super(prev); + + this.splits = splits; + this.err = err; + } + + /** {@inheritDoc} */ + @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) { + Map<GridHadoopInputSplit, Integer> splitsCp = new HashMap<>(cp.pendingSplits()); + + for (GridHadoopInputSplit s : splits) + splitsCp.remove(s); + + cp.pendingSplits(splitsCp); + + if (cp.phase() != PHASE_CANCELLING && err != null) + cp.failCause(err); + + if (err != null) + cp.phase(PHASE_CANCELLING); + + if (splitsCp.isEmpty()) { + if (cp.phase() != PHASE_CANCELLING) + cp.phase(PHASE_REDUCE); + } + } + } + + /** + * Remove reducer transform closure. + */ + private static class RemoveReducerProcessor extends StackedProcessor { + /** */ + private static final long serialVersionUID = 0L; + + /** Mapper split to remove. */ + private final int rdc; + + /** Error. */ + private Throwable err; + + /** + * @param prev Previous closure. + * @param rdc Reducer to remove. + */ + private RemoveReducerProcessor(@Nullable StackedProcessor prev, int rdc) { + super(prev); + + this.rdc = rdc; + } + + /** + * @param prev Previous closure. + * @param rdc Reducer to remove. + * @param err Error. + */ + private RemoveReducerProcessor(@Nullable StackedProcessor prev, int rdc, Throwable err) { + super(prev); + + this.rdc = rdc; + this.err = err; + } + + /** {@inheritDoc} */ + @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) { + Collection<Integer> rdcCp = new HashSet<>(cp.pendingReducers()); + + rdcCp.remove(rdc); + + cp.pendingReducers(rdcCp); + + if (err != null) { + cp.phase(PHASE_CANCELLING); + cp.failCause(err); + } + } + } + + /** + * Initialize reducers. + */ + private static class InitializeReducersProcessor extends StackedProcessor { + /** */ + private static final long serialVersionUID = 0L; + + /** Reducers. */ + private final Collection<Integer> rdc; + + /** Process descriptor for reducers. */ + private final GridHadoopProcessDescriptor desc; + + /** + * @param prev Previous closure. + * @param rdc Reducers to initialize. + * @param desc External process descriptor. + */ + private InitializeReducersProcessor(@Nullable StackedProcessor prev, + Collection<Integer> rdc, + GridHadoopProcessDescriptor desc) { + super(prev); + + assert !F.isEmpty(rdc); + assert desc != null; + + this.rdc = rdc; + this.desc = desc; + } + + /** {@inheritDoc} */ + @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) { + Map<Integer, GridHadoopProcessDescriptor> oldMap = meta.reducersAddresses(); + + Map<Integer, GridHadoopProcessDescriptor> rdcMap = oldMap == null ? + new HashMap<Integer, GridHadoopProcessDescriptor>() : new HashMap<>(oldMap); + + for (Integer r : rdc) + rdcMap.put(r, desc); + + cp.reducersAddresses(rdcMap); + } + } + + /** + * Remove reducer transform closure. + */ + private static class CancelJobProcessor extends StackedProcessor { + /** */ + private static final long serialVersionUID = 0L; + + /** Mapper split to remove. */ + private final Collection<GridHadoopInputSplit> splits; + + /** Reducers to remove. */ + private final Collection<Integer> rdc; + + /** Error. */ + private final Throwable err; + + /** + * @param prev Previous closure. + * @param err Fail cause. + */ + private CancelJobProcessor(@Nullable StackedProcessor prev, Throwable err) { + this(prev, err, null, null); + } + + /** + * @param prev Previous closure. + * @param splits Splits to remove. + * @param rdc Reducers to remove. + */ + private CancelJobProcessor(@Nullable StackedProcessor prev, + Collection<GridHadoopInputSplit> splits, + Collection<Integer> rdc) { + this(prev, null, splits, rdc); + } + + /** + * @param prev Previous closure. + * @param err Error. + * @param splits Splits to remove. + * @param rdc Reducers to remove. + */ + private CancelJobProcessor(@Nullable StackedProcessor prev, + Throwable err, + Collection<GridHadoopInputSplit> splits, + Collection<Integer> rdc) { + super(prev); + + this.splits = splits; + this.rdc = rdc; + this.err = err; + } + + /** {@inheritDoc} */ + @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) { + assert meta.phase() == PHASE_CANCELLING || err != null: "Invalid phase for cancel: " + meta; + + Collection<Integer> rdcCp = new HashSet<>(cp.pendingReducers()); + + if (rdc != null) + rdcCp.removeAll(rdc); + + cp.pendingReducers(rdcCp); + + Map<GridHadoopInputSplit, Integer> splitsCp = new HashMap<>(cp.pendingSplits()); + + if (splits != null) { + for (GridHadoopInputSplit s : splits) + splitsCp.remove(s); + } + + cp.pendingSplits(splitsCp); + + cp.phase(PHASE_CANCELLING); + + if (err != null) + cp.failCause(err); + } + } + + /** + * Increment counter values closure. + */ + private static class IncrementCountersProcessor extends StackedProcessor { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final GridHadoopCounters counters; + + /** + * @param prev Previous closure. + * @param counters Task counters to add into job counters. + */ + private IncrementCountersProcessor(@Nullable StackedProcessor prev, GridHadoopCounters counters) { + super(prev); + + assert counters != null; + + this.counters = counters; + } + + /** {@inheritDoc} */ + @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) { + GridHadoopCounters cntrs = new GridHadoopCountersImpl(cp.counters()); + + cntrs.merge(counters); + + cp.counters(cntrs); + } + } + + /** + * Abstract stacked closure. + */ + private abstract static class StackedProcessor implements + EntryProcessor<GridHadoopJobId, GridHadoopJobMetadata, Void>, Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final StackedProcessor prev; + + /** + * @param prev Previous closure. + */ + private StackedProcessor(@Nullable StackedProcessor prev) { + this.prev = prev; + } + + /** {@inheritDoc} */ + @Override public Void process(MutableEntry<GridHadoopJobId, GridHadoopJobMetadata> e, Object... args) { + GridHadoopJobMetadata val = apply(e.getValue()); + + if (val != null) + e.setValue(val); + else + e.remove();; + + return null; + } + + /** + * @param meta Old value. + * @return New value. + */ + private GridHadoopJobMetadata apply(GridHadoopJobMetadata meta) { + if (meta == null) + return null; + + GridHadoopJobMetadata cp = prev != null ? prev.apply(meta) : new GridHadoopJobMetadata(meta); + + update(meta, cp); + + return cp; + } + + /** + * Update given job metadata object. + * + * @param meta Initial job metadata. + * @param cp Copy. + */ + protected abstract void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java index c734acd..8fdab9d 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java @@ -37,7 +37,7 @@ public class GridHadoopProtocolSubmitJobTask extends GridHadoopProtocolTaskAdapt GridHadoopProtocolTaskArguments args) throws IgniteCheckedException { UUID nodeId = UUID.fromString(args.<String>get(0)); Integer id = args.get(1); - GridHadoopDefaultJobInfo info = args.get(2); + HadoopDefaultJobInfo info = args.get(2); assert nodeId != null; assert id != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java index 4c83ace..66fb230 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java @@ -37,7 +37,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; import java.io.*; -import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*; +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; /** * Hadoop client protocol. @@ -238,7 +238,7 @@ public class HadoopClientProtocol implements ClientProtocol { @Override public String getStagingAreaDir() throws IOException, InterruptedException { String usr = UserGroupInformation.getCurrentUser().getShortUserName(); - return GridHadoopUtils.stagingAreaDir(conf, usr).toString(); + return HadoopUtils.stagingAreaDir(conf, usr).toString(); } /** {@inheritDoc} */ @@ -327,6 +327,6 @@ public class HadoopClientProtocol implements ClientProtocol { else assert lastStatus != null; - return GridHadoopUtils.status(lastStatus, conf); + return HadoopUtils.status(lastStatus, conf); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffle.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffle.java deleted file mode 100644 index 396124e..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffle.java +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.message.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.offheap.unsafe.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; - -import java.util.*; -import java.util.concurrent.*; - -/** - * Shuffle. - */ -public class GridHadoopShuffle extends GridHadoopComponent { - /** */ - private final ConcurrentMap<GridHadoopJobId, GridHadoopShuffleJob<UUID>> jobs = new ConcurrentHashMap<>(); - - /** */ - protected final GridUnsafeMemory mem = new GridUnsafeMemory(0); - - /** {@inheritDoc} */ - @Override public void start(GridHadoopContext ctx) throws IgniteCheckedException { - super.start(ctx); - - ctx.kernalContext().io().addUserMessageListener(GridTopic.TOPIC_HADOOP, - new IgniteBiPredicate<UUID, Object>() { - @Override public boolean apply(UUID nodeId, Object msg) { - return onMessageReceived(nodeId, (GridHadoopMessage)msg); - } - }); - } - - /** - * Stops shuffle. - * - * @param cancel If should cancel all ongoing activities. - */ - @Override public void stop(boolean cancel) { - for (GridHadoopShuffleJob job : jobs.values()) { - try { - job.close(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to close job.", e); - } - } - - jobs.clear(); - } - - /** - * Creates new shuffle job. - * - * @param jobId Job ID. - * @return Created shuffle job. - * @throws IgniteCheckedException If job creation failed. - */ - private GridHadoopShuffleJob<UUID> newJob(GridHadoopJobId jobId) throws IgniteCheckedException { - GridHadoopMapReducePlan plan = ctx.jobTracker().plan(jobId); - - GridHadoopShuffleJob<UUID> job = new GridHadoopShuffleJob<>(ctx.localNodeId(), log, - ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId())); - - UUID[] rdcAddrs = new UUID[plan.reducers()]; - - for (int i = 0; i < rdcAddrs.length; i++) { - UUID nodeId = plan.nodeForReducer(i); - - assert nodeId != null : "Plan is missing node for reducer [plan=" + plan + ", rdc=" + i + ']'; - - rdcAddrs[i] = nodeId; - } - - boolean init = job.initializeReduceAddresses(rdcAddrs); - - assert init; - - return job; - } - - /** - * @param nodeId Node ID to send message to. - * @param msg Message to send. - * @throws IgniteCheckedException If send failed. - */ - private void send0(UUID nodeId, Object msg) throws IgniteCheckedException { - ClusterNode node = ctx.kernalContext().discovery().node(nodeId); - - ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0); - } - - /** - * @param jobId Task info. - * @return Shuffle job. - */ - private GridHadoopShuffleJob<UUID> job(GridHadoopJobId jobId) throws IgniteCheckedException { - GridHadoopShuffleJob<UUID> res = jobs.get(jobId); - - if (res == null) { - res = newJob(jobId); - - GridHadoopShuffleJob<UUID> old = jobs.putIfAbsent(jobId, res); - - if (old != null) { - res.close(); - - res = old; - } - else if (res.reducersInitialized()) - startSending(res); - } - - return res; - } - - /** - * Starts message sending thread. - * - * @param shuffleJob Job to start sending for. - */ - private void startSending(GridHadoopShuffleJob<UUID> shuffleJob) { - shuffleJob.startSending(ctx.kernalContext().gridName(), - new IgniteInClosure2X<UUID, GridHadoopShuffleMessage>() { - @Override public void applyx(UUID dest, GridHadoopShuffleMessage msg) throws IgniteCheckedException { - send0(dest, msg); - } - } - ); - } - - /** - * Message received callback. - * - * @param src Sender node ID. - * @param msg Received message. - * @return {@code True}. - */ - public boolean onMessageReceived(UUID src, GridHadoopMessage msg) { - if (msg instanceof GridHadoopShuffleMessage) { - GridHadoopShuffleMessage m = (GridHadoopShuffleMessage)msg; - - try { - job(m.jobId()).onShuffleMessage(m); - } - catch (IgniteCheckedException e) { - U.error(log, "Message handling failed.", e); - } - - try { - // Reply with ack. - send0(src, new GridHadoopShuffleAck(m.id(), m.jobId())); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to reply back to shuffle message sender [snd=" + src + ", msg=" + msg + ']', e); - } - } - else if (msg instanceof GridHadoopShuffleAck) { - GridHadoopShuffleAck m = (GridHadoopShuffleAck)msg; - - try { - job(m.jobId()).onShuffleAck(m); - } - catch (IgniteCheckedException e) { - U.error(log, "Message handling failed.", e); - } - } - else - throw new IllegalStateException("Unknown message type received to Hadoop shuffle [src=" + src + - ", msg=" + msg + ']'); - - return true; - } - - /** - * @param taskCtx Task info. - * @return Output. - */ - public GridHadoopTaskOutput output(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - return job(taskCtx.taskInfo().jobId()).output(taskCtx); - } - - /** - * @param taskCtx Task info. - * @return Input. - */ - public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - return job(taskCtx.taskInfo().jobId()).input(taskCtx); - } - - /** - * @param jobId Job id. - */ - public void jobFinished(GridHadoopJobId jobId) { - GridHadoopShuffleJob job = jobs.remove(jobId); - - if (job != null) { - try { - job.close(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to close job: " + jobId, e); - } - } - } - - /** - * Flushes all the outputs for the given job to remote nodes. - * - * @param jobId Job ID. - * @return Future. - */ - public IgniteInternalFuture<?> flush(GridHadoopJobId jobId) { - GridHadoopShuffleJob job = jobs.get(jobId); - - if (job == null) - return new GridFinishedFutureEx<>(); - - try { - return job.flush(); - } - catch (IgniteCheckedException e) { - return new GridFinishedFutureEx<>(e); - } - } - - /** - * @return Memory. - */ - public GridUnsafeMemory memory() { - return mem; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java new file mode 100644 index 0000000..9880093 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.message.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.offheap.unsafe.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * Shuffle. + */ +public class HadoopShuffle extends HadoopComponent { + /** */ + private final ConcurrentMap<GridHadoopJobId, GridHadoopShuffleJob<UUID>> jobs = new ConcurrentHashMap<>(); + + /** */ + protected final GridUnsafeMemory mem = new GridUnsafeMemory(0); + + /** {@inheritDoc} */ + @Override public void start(HadoopContext ctx) throws IgniteCheckedException { + super.start(ctx); + + ctx.kernalContext().io().addUserMessageListener(GridTopic.TOPIC_HADOOP, + new IgniteBiPredicate<UUID, Object>() { + @Override public boolean apply(UUID nodeId, Object msg) { + return onMessageReceived(nodeId, (GridHadoopMessage)msg); + } + }); + } + + /** + * Stops shuffle. + * + * @param cancel If should cancel all ongoing activities. + */ + @Override public void stop(boolean cancel) { + for (GridHadoopShuffleJob job : jobs.values()) { + try { + job.close(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to close job.", e); + } + } + + jobs.clear(); + } + + /** + * Creates new shuffle job. + * + * @param jobId Job ID. + * @return Created shuffle job. + * @throws IgniteCheckedException If job creation failed. + */ + private GridHadoopShuffleJob<UUID> newJob(GridHadoopJobId jobId) throws IgniteCheckedException { + GridHadoopMapReducePlan plan = ctx.jobTracker().plan(jobId); + + GridHadoopShuffleJob<UUID> job = new GridHadoopShuffleJob<>(ctx.localNodeId(), log, + ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId())); + + UUID[] rdcAddrs = new UUID[plan.reducers()]; + + for (int i = 0; i < rdcAddrs.length; i++) { + UUID nodeId = plan.nodeForReducer(i); + + assert nodeId != null : "Plan is missing node for reducer [plan=" + plan + ", rdc=" + i + ']'; + + rdcAddrs[i] = nodeId; + } + + boolean init = job.initializeReduceAddresses(rdcAddrs); + + assert init; + + return job; + } + + /** + * @param nodeId Node ID to send message to. + * @param msg Message to send. + * @throws IgniteCheckedException If send failed. + */ + private void send0(UUID nodeId, Object msg) throws IgniteCheckedException { + ClusterNode node = ctx.kernalContext().discovery().node(nodeId); + + ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0); + } + + /** + * @param jobId Task info. + * @return Shuffle job. + */ + private GridHadoopShuffleJob<UUID> job(GridHadoopJobId jobId) throws IgniteCheckedException { + GridHadoopShuffleJob<UUID> res = jobs.get(jobId); + + if (res == null) { + res = newJob(jobId); + + GridHadoopShuffleJob<UUID> old = jobs.putIfAbsent(jobId, res); + + if (old != null) { + res.close(); + + res = old; + } + else if (res.reducersInitialized()) + startSending(res); + } + + return res; + } + + /** + * Starts message sending thread. + * + * @param shuffleJob Job to start sending for. + */ + private void startSending(GridHadoopShuffleJob<UUID> shuffleJob) { + shuffleJob.startSending(ctx.kernalContext().gridName(), + new IgniteInClosure2X<UUID, GridHadoopShuffleMessage>() { + @Override public void applyx(UUID dest, GridHadoopShuffleMessage msg) throws IgniteCheckedException { + send0(dest, msg); + } + } + ); + } + + /** + * Message received callback. + * + * @param src Sender node ID. + * @param msg Received message. + * @return {@code True}. + */ + public boolean onMessageReceived(UUID src, GridHadoopMessage msg) { + if (msg instanceof GridHadoopShuffleMessage) { + GridHadoopShuffleMessage m = (GridHadoopShuffleMessage)msg; + + try { + job(m.jobId()).onShuffleMessage(m); + } + catch (IgniteCheckedException e) { + U.error(log, "Message handling failed.", e); + } + + try { + // Reply with ack. + send0(src, new GridHadoopShuffleAck(m.id(), m.jobId())); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to reply back to shuffle message sender [snd=" + src + ", msg=" + msg + ']', e); + } + } + else if (msg instanceof GridHadoopShuffleAck) { + GridHadoopShuffleAck m = (GridHadoopShuffleAck)msg; + + try { + job(m.jobId()).onShuffleAck(m); + } + catch (IgniteCheckedException e) { + U.error(log, "Message handling failed.", e); + } + } + else + throw new IllegalStateException("Unknown message type received to Hadoop shuffle [src=" + src + + ", msg=" + msg + ']'); + + return true; + } + + /** + * @param taskCtx Task info. + * @return Output. + */ + public GridHadoopTaskOutput output(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + return job(taskCtx.taskInfo().jobId()).output(taskCtx); + } + + /** + * @param taskCtx Task info. + * @return Input. + */ + public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + return job(taskCtx.taskInfo().jobId()).input(taskCtx); + } + + /** + * @param jobId Job id. + */ + public void jobFinished(GridHadoopJobId jobId) { + GridHadoopShuffleJob job = jobs.remove(jobId); + + if (job != null) { + try { + job.close(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to close job: " + jobId, e); + } + } + } + + /** + * Flushes all the outputs for the given job to remote nodes. + * + * @param jobId Job ID. + * @return Future. + */ + public IgniteInternalFuture<?> flush(GridHadoopJobId jobId) { + GridHadoopShuffleJob job = jobs.get(jobId); + + if (job == null) + return new GridFinishedFutureEx<>(); + + try { + return job.flush(); + } + catch (IgniteCheckedException e) { + return new GridFinishedFutureEx<>(e); + } + } + + /** + * @return Memory. + */ + public GridUnsafeMemory memory() { + return mem; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopEmbeddedTaskExecutor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopEmbeddedTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopEmbeddedTaskExecutor.java deleted file mode 100644 index fde5400..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopEmbeddedTaskExecutor.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.jobtracker.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.*; -import java.util.concurrent.*; - - -/** - * Task executor. - */ -public class GridHadoopEmbeddedTaskExecutor extends GridHadoopTaskExecutorAdapter { - /** Job tracker. */ - private GridHadoopJobTracker jobTracker; - - /** */ - private final ConcurrentMap<GridHadoopJobId, Collection<GridHadoopRunnableTask>> jobs = new ConcurrentHashMap<>(); - - /** Executor service to run tasks. */ - private GridHadoopExecutorService exec; - - /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { - super.onKernalStart(); - - jobTracker = ctx.jobTracker(); - - exec = new GridHadoopExecutorService(log, ctx.kernalContext().gridName(), - ctx.configuration().getMaxParallelTasks(), ctx.configuration().getMaxTaskQueueSize()); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - if (exec != null) { - exec.shutdown(3000); - - if (cancel) { - for (GridHadoopJobId jobId : jobs.keySet()) - cancelTasks(jobId); - } - } - } - - /** {@inheritDoc} */ - @Override public void stop(boolean cancel) { - if (exec != null && !exec.shutdown(30000)) - U.warn(log, "Failed to finish running tasks in 30 sec."); - } - - /** {@inheritDoc} */ - @Override public void run(final GridHadoopJob job, Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException { - if (log.isDebugEnabled()) - log.debug("Submitting tasks for local execution [locNodeId=" + ctx.localNodeId() + - ", tasksCnt=" + tasks.size() + ']'); - - Collection<GridHadoopRunnableTask> executedTasks = jobs.get(job.id()); - - if (executedTasks == null) { - executedTasks = new GridConcurrentHashSet<>(); - - Collection<GridHadoopRunnableTask> extractedCol = jobs.put(job.id(), executedTasks); - - assert extractedCol == null; - } - - final Collection<GridHadoopRunnableTask> finalExecutedTasks = executedTasks; - - for (final GridHadoopTaskInfo info : tasks) { - assert info != null; - - GridHadoopRunnableTask task = new GridHadoopRunnableTask(log, job, ctx.shuffle().memory(), info, - ctx.localNodeId()) { - @Override protected void onTaskFinished(GridHadoopTaskStatus status) { - if (log.isDebugEnabled()) - log.debug("Finished task execution [jobId=" + job.id() + ", taskInfo=" + info + ", " + - "waitTime=" + waitTime() + ", execTime=" + executionTime() + ']'); - - finalExecutedTasks.remove(this); - - jobTracker.onTaskFinished(info, status); - } - - @Override protected GridHadoopTaskInput createInput(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - return ctx.shuffle().input(taskCtx); - } - - @Override protected GridHadoopTaskOutput createOutput(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - return ctx.shuffle().output(taskCtx); - } - }; - - executedTasks.add(task); - - exec.submit(task); - } - } - - /** - * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks - * for this job ID. - * <p> - * It is guaranteed that this method will not be called concurrently with - * {@link #run(GridHadoopJob, Collection)} method. No more job submissions will be performed via - * {@link #run(GridHadoopJob, Collection)} method for given job ID after this method is called. - * - * @param jobId Job ID to cancel. - */ - @Override public void cancelTasks(GridHadoopJobId jobId) { - Collection<GridHadoopRunnableTask> executedTasks = jobs.get(jobId); - - if (executedTasks != null) { - for (GridHadoopRunnableTask task : executedTasks) - task.cancel(); - } - } - - /** {@inheritDoc} */ - @Override public void onJobStateChanged(GridHadoopJobMetadata meta) throws IgniteCheckedException { - if (meta.phase() == GridHadoopJobPhase.PHASE_COMPLETE) { - Collection<GridHadoopRunnableTask> executedTasks = jobs.remove(meta.jobId()); - - assert executedTasks == null || executedTasks.isEmpty(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java index fd4a030..0d49be9 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java @@ -130,7 +130,7 @@ public abstract class GridHadoopRunnableTask implements Callable<Void> { } } } - catch (GridHadoopTaskCancelledException ignored) { + catch (HadoopTaskCancelledException ignored) { state = GridHadoopTaskState.CANCELED; } catch (Throwable e) { @@ -163,7 +163,7 @@ public abstract class GridHadoopRunnableTask implements Callable<Void> { */ private void runTask(GridHadoopPerformanceCounter perfCntr) throws IgniteCheckedException { if (cancelled) - throw new GridHadoopTaskCancelledException("Task cancelled."); + throw new HadoopTaskCancelledException("Task cancelled."); try (GridHadoopTaskOutput out = createOutputInternal(ctx); GridHadoopTaskInput in = createInputInternal(ctx)) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskExecutorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskExecutorAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskExecutorAdapter.java deleted file mode 100644 index 8f66190..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskExecutorAdapter.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.jobtracker.*; - -import java.util.*; - -/** - * Common superclass for task executor. - */ -public abstract class GridHadoopTaskExecutorAdapter extends GridHadoopComponent { - /** - * Runs tasks. - * - * @param job Job. - * @param tasks Tasks. - * @throws IgniteCheckedException If failed. - */ - public abstract void run(final GridHadoopJob job, Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException; - - /** - * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks - * for this job ID. - * <p> - * It is guaranteed that this method will not be called concurrently with - * {@link #run(GridHadoopJob, Collection)} method. No more job submissions will be performed via - * {@link #run(GridHadoopJob, Collection)} method for given job ID after this method is called. - * - * @param jobId Job ID to cancel. - */ - public abstract void cancelTasks(GridHadoopJobId jobId) throws IgniteCheckedException; - - /** - * On job state change callback; - * - * @param meta Job metadata. - */ - public abstract void onJobStateChanged(GridHadoopJobMetadata meta) throws IgniteCheckedException; -}