http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskOutput.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskOutput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskOutput.java new file mode 100644 index 0000000..41d9847 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskOutput.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; + +/** + * Task output. + */ +public interface HadoopTaskOutput extends AutoCloseable { + /** + * Writes key and value to the output. + * + * @param key Key. + * @param val Value. + */ + public void write(Object key, Object val) throws IgniteCheckedException; + + /** + * Closes output. + * + * @throws IgniteCheckedException If failed. + */ + @Override public void close() throws IgniteCheckedException; +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskType.java new file mode 100644 index 0000000..a88e189 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskType.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.jetbrains.annotations.*; + +/** +* Task type. +*/ +public enum HadoopTaskType { + /** Setup task. */ + SETUP, + + /** Map task. */ + MAP, + + /** Reduce task. */ + REDUCE, + + /** Combine task. */ + COMBINE, + + /** Commit task. */ + COMMIT, + + /** Abort task. */ + ABORT; + + /** Enumerated values. */ + private static final HadoopTaskType[] VALS = values(); + + /** + * Efficiently gets enumerated value from its ordinal. + * + * @param ord Ordinal value. + * @return Enumerated value. + */ + @Nullable public static HadoopTaskType fromOrdinal(byte ord) { + return ord >= 0 && ord < VALS.length ? VALS[ord] : null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java index d0ef4ce..caa9194 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java @@ -33,7 +33,7 @@ public class IgniteHadoopNoopProcessor extends IgniteHadoopProcessorAdapter { } /** {@inheritDoc} */ - @Override public GridHadoop hadoop() { + @Override public Hadoop hadoop() { throw new IllegalStateException("Hadoop module is not found in class path."); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessorAdapter.java index c2cf542..d40d5e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessorAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessorAdapter.java @@ -35,7 +35,7 @@ public abstract class IgniteHadoopProcessorAdapter extends GridProcessorAdapter /** * @return Hadoop facade. */ - public abstract GridHadoop hadoop(); + public abstract Hadoop hadoop(); /** * @return Hadoop configuration. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java index 449cff2..01e554c 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java @@ -31,7 +31,7 @@ import java.util.*; /** * Statistic writer implementation that writes info into any Hadoop file system. */ -public class IgniteHadoopFileSystemCounterWriter implements GridHadoopCounterWriter { +public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter { /** */ public static final String PERFORMANCE_COUNTER_FILE_NAME = "performance"; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java index 3482640..39b9ba6 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java @@ -39,7 +39,7 @@ public class HadoopCounters extends Counters { * @param cntrs Counters to adapt. */ public HadoopCounters(GridHadoopCounters cntrs) { - for (GridHadoopCounter cntr : cntrs.all()) + for (HadoopCounter cntr : cntrs.all()) if (cntr instanceof HadoopLongCounter) this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (HadoopLongCounter) cntr); } @@ -73,7 +73,7 @@ public class HadoopCounters extends Counters { @Override public synchronized Iterable<String> getGroupNames() { Collection<String> res = new HashSet<>(); - for (GridHadoopCounter counter : cntrs.values()) + for (HadoopCounter counter : cntrs.values()) res.add(counter.group()); return res; @@ -167,7 +167,7 @@ public class HadoopCounters extends Counters { public int groupSize(String grpName) { int res = 0; - for (GridHadoopCounter counter : cntrs.values()) { + for (HadoopCounter counter : cntrs.values()) { if (grpName.equals(counter.group())) res++; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java index 2f44778..438874a 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java @@ -82,7 +82,7 @@ public class HadoopDefaultJobInfo implements GridHadoopJobInfo, Externalizable { } /** {@inheritDoc} */ - @Override public GridHadoopJob createJob(GridHadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { + @Override public HadoopJob createJob(GridHadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { try { Class<?> jobCls0 = jobCls; @@ -99,7 +99,7 @@ public class HadoopDefaultJobInfo implements GridHadoopJobInfo, Externalizable { Constructor<?> constructor = jobCls0.getConstructor(GridHadoopJobId.class, HadoopDefaultJobInfo.class, IgniteLogger.class); - return (GridHadoopJob)constructor.newInstance(jobId, this, log); + return (HadoopJob)constructor.newInstance(jobId, this, log); } // NB: java.lang.NoClassDefFoundError may be thrown from Class#getConstructor() call. catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java index b87e7f8..b4f2c87 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java @@ -25,7 +25,7 @@ import org.jetbrains.annotations.*; /** * Hadoop facade implementation. */ -public class HadoopImpl implements GridHadoop { +public class HadoopImpl implements Hadoop { /** Hadoop processor. */ private final HadoopProcessor proc; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java index 1f50b0c..75e55fd 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java @@ -45,7 +45,7 @@ public class HadoopProcessor extends IgniteHadoopProcessorAdapter { /** Hadoop facade for public API. */ @GridToStringExclude - private GridHadoop hadoop; + private Hadoop hadoop; /** * @param ctx Kernal context. @@ -158,7 +158,7 @@ public class HadoopProcessor extends IgniteHadoopProcessorAdapter { } /** {@inheritDoc} */ - @Override public GridHadoop hadoop() { + @Override public Hadoop hadoop() { if (hadoop == null) throw new IllegalStateException("Hadoop accelerator is disabled (Hadoop is not in classpath, " + "is HADOOP_HOME environment variable set?)"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java index 3fdce14..4b96f7d 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java @@ -26,7 +26,7 @@ import java.io.*; /** * Default Hadoop counter implementation. */ -public abstract class HadoopCounterAdapter implements GridHadoopCounter, Externalizable { +public abstract class HadoopCounterAdapter implements HadoopCounter, Externalizable { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java index 01b1473..bfd59ef 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java @@ -36,7 +36,7 @@ public class HadoopCountersImpl implements GridHadoopCounters, Externalizable { private static final long serialVersionUID = 0L; /** */ - private final ConcurrentMap<CounterKey, GridHadoopCounter> cntrsMap = new ConcurrentHashMap8<>(); + private final ConcurrentMap<CounterKey, HadoopCounter> cntrsMap = new ConcurrentHashMap8<>(); /** * Default constructor. Creates new instance without counters. @@ -50,7 +50,7 @@ public class HadoopCountersImpl implements GridHadoopCounters, Externalizable { * * @param cntrs Counters to store. */ - public HadoopCountersImpl(Iterable<GridHadoopCounter> cntrs) { + public HadoopCountersImpl(Iterable<HadoopCounter> cntrs) { addCounters(cntrs, true); } @@ -71,7 +71,7 @@ public class HadoopCountersImpl implements GridHadoopCounters, Externalizable { * @param name Counter name. * @return Counter. */ - private <T extends GridHadoopCounter> T createCounter(Class<? extends GridHadoopCounter> cls, String grp, + private <T extends HadoopCounter> T createCounter(Class<? extends HadoopCounter> cls, String grp, String name) { try { Constructor constructor = cls.getConstructor(String.class, String.class); @@ -89,12 +89,12 @@ public class HadoopCountersImpl implements GridHadoopCounters, Externalizable { * @param cntrs Counters to add. * @param cp Whether to copy counters or not. */ - private void addCounters(Iterable<GridHadoopCounter> cntrs, boolean cp) { + private void addCounters(Iterable<HadoopCounter> cntrs, boolean cp) { assert cntrs != null; - for (GridHadoopCounter cntr : cntrs) { + for (HadoopCounter cntr : cntrs) { if (cp) { - GridHadoopCounter cntrCp = createCounter(cntr.getClass(), cntr.group(), cntr.name()); + HadoopCounter cntrCp = createCounter(cntr.getClass(), cntr.group(), cntr.name()); cntrCp.merge(cntr); @@ -106,7 +106,7 @@ public class HadoopCountersImpl implements GridHadoopCounters, Externalizable { } /** {@inheritDoc} */ - @Override public <T extends GridHadoopCounter> T counter(String grp, String name, Class<T> cls) { + @Override public <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls) { assert cls != null; CounterKey mapKey = new CounterKey(cls, grp, name); @@ -126,13 +126,13 @@ public class HadoopCountersImpl implements GridHadoopCounters, Externalizable { } /** {@inheritDoc} */ - @Override public Collection<GridHadoopCounter> all() { + @Override public Collection<HadoopCounter> all() { return cntrsMap.values(); } /** {@inheritDoc} */ @Override public void merge(GridHadoopCounters other) { - for (GridHadoopCounter counter : other.all()) + for (HadoopCounter counter : other.all()) counter(counter.group(), counter.name(), counter.getClass()).merge(counter); } @@ -144,7 +144,7 @@ public class HadoopCountersImpl implements GridHadoopCounters, Externalizable { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - addCounters(U.<GridHadoopCounter>readCollection(in), false); + addCounters(U.<HadoopCounter>readCollection(in), false); } /** {@inheritDoc} */ @@ -173,7 +173,7 @@ public class HadoopCountersImpl implements GridHadoopCounters, Externalizable { /** * The tuple of counter identifier components for more readable code. */ - private static class CounterKey extends GridTuple3<Class<? extends GridHadoopCounter>, String, String> { + private static class CounterKey extends GridTuple3<Class<? extends HadoopCounter>, String, String> { /** */ private static final long serialVersionUID = 0L; @@ -184,7 +184,7 @@ public class HadoopCountersImpl implements GridHadoopCounters, Externalizable { * @param grp Group name. * @param name Counter name. */ - private CounterKey(Class<? extends GridHadoopCounter> cls, String grp, String name) { + private CounterKey(Class<? extends HadoopCounter> cls, String grp, String name) { super(cls, grp, name); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java index 1aa1e0e..d926706 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java @@ -59,7 +59,7 @@ public class HadoopLongCounter extends HadoopCounterAdapter { } /** {@inheritDoc} */ - @Override public void merge(GridHadoopCounter cntr) { + @Override public void merge(HadoopCounter cntr) { val += ((HadoopLongCounter)cntr).val; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java index f22d0cd..6f57ae4 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java @@ -97,7 +97,7 @@ public class HadoopPerformanceCounter extends HadoopCounterAdapter { } /** {@inheritDoc} */ - @Override public void merge(GridHadoopCounter cntr) { + @Override public void merge(HadoopCounter cntr) { evts.addAll(((HadoopPerformanceCounter)cntr).evts); } @@ -162,7 +162,7 @@ public class HadoopPerformanceCounter extends HadoopCounterAdapter { * @param ts Timestamp of the event. */ public void onTaskFinish(GridHadoopTaskInfo info, long ts) { - if (info.type() == GridHadoopTaskType.REDUCE && lastShuffleMsg != null) { + if (info.type() == HadoopTaskType.REDUCE && lastShuffleMsg != null) { evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "start"), firstShuffleMsg)); evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "finish"), lastShuffleMsg)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java index 6042775..2d64277 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java @@ -48,7 +48,7 @@ public class HadoopJobMetadata implements Externalizable { private GridHadoopMapReducePlan mrPlan; /** Pending splits for which mapper should be executed. */ - private Map<GridHadoopInputSplit, Integer> pendingSplits; + private Map<HadoopInputSplit, Integer> pendingSplits; /** Pending reducers. */ private Collection<Integer> pendingReducers; @@ -154,7 +154,7 @@ public class HadoopJobMetadata implements Externalizable { * * @param pendingSplits Collection of pending splits. */ - public void pendingSplits(Map<GridHadoopInputSplit, Integer> pendingSplits) { + public void pendingSplits(Map<HadoopInputSplit, Integer> pendingSplits) { this.pendingSplits = pendingSplits; } @@ -163,7 +163,7 @@ public class HadoopJobMetadata implements Externalizable { * * @return Collection of pending splits. */ - public Map<GridHadoopInputSplit, Integer> pendingSplits() { + public Map<HadoopInputSplit, Integer> pendingSplits() { return pendingSplits; } @@ -261,7 +261,7 @@ public class HadoopJobMetadata implements Externalizable { * @param split Split. * @return Task number. */ - public int taskNumber(GridHadoopInputSplit split) { + public int taskNumber(HadoopInputSplit split) { return pendingSplits.get(split); } @@ -287,7 +287,7 @@ public class HadoopJobMetadata implements Externalizable { jobId = (GridHadoopJobId)in.readObject(); jobInfo = (GridHadoopJobInfo)in.readObject(); mrPlan = (GridHadoopMapReducePlan)in.readObject(); - pendingSplits = (Map<GridHadoopInputSplit,Integer>)in.readObject(); + pendingSplits = (Map<HadoopInputSplit,Integer>)in.readObject(); pendingReducers = (Collection<Integer>)in.readObject(); phase = (GridHadoopJobPhase)in.readObject(); failCause = (Throwable)in.readObject(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java index a0ae3f6..99a759d 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java @@ -45,7 +45,7 @@ import java.util.concurrent.atomic.*; import static java.util.concurrent.TimeUnit.*; import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobPhase.*; -import static org.apache.ignite.internal.processors.hadoop.GridHadoopTaskType.*; +import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.*; import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.*; /** @@ -66,7 +66,7 @@ public class HadoopJobTracker extends HadoopComponent { private GridHadoopMapReducePlanner mrPlanner; /** All the known jobs. */ - private final ConcurrentMap<GridHadoopJobId, GridFutureAdapterEx<GridHadoopJob>> jobs = new ConcurrentHashMap8<>(); + private final ConcurrentMap<GridHadoopJobId, GridFutureAdapterEx<HadoopJob>> jobs = new ConcurrentHashMap8<>(); /** Locally active jobs. */ private final ConcurrentMap<GridHadoopJobId, JobLocalState> activeJobs = new ConcurrentHashMap8<>(); @@ -246,7 +246,7 @@ public class HadoopJobTracker extends HadoopComponent { if (jobs.containsKey(jobId) || jobMetaCache().containsKey(jobId)) throw new IgniteCheckedException("Failed to submit job. Job with the same ID already exists: " + jobId); - GridHadoopJob job = job(jobId, info); + HadoopJob job = job(jobId, info); GridHadoopMapReducePlan mrPlan = mrPlanner.preparePlan(job, ctx.nodes(), null); @@ -511,13 +511,13 @@ public class HadoopJobTracker extends HadoopComponent { * @return Collection of all input splits that should be processed. */ @SuppressWarnings("ConstantConditions") - private Map<GridHadoopInputSplit, Integer> allSplits(GridHadoopMapReducePlan plan) { - Map<GridHadoopInputSplit, Integer> res = new HashMap<>(); + private Map<HadoopInputSplit, Integer> allSplits(GridHadoopMapReducePlan plan) { + Map<HadoopInputSplit, Integer> res = new HashMap<>(); int taskNum = 0; for (UUID nodeId : plan.mapperNodeIds()) { - for (GridHadoopInputSplit split : plan.mappers(nodeId)) { + for (HadoopInputSplit split : plan.mappers(nodeId)) { if (res.put(split, taskNum++) != null) throw new IllegalStateException("Split duplicate."); } @@ -568,7 +568,7 @@ public class HadoopJobTracker extends HadoopComponent { try { if (checkSetup && phase == PHASE_SETUP && !activeJobs.containsKey(jobId)) { // Failover setup task. - GridHadoopJob job = job(jobId, meta.jobInfo()); + HadoopJob job = job(jobId, meta.jobInfo()); Collection<GridHadoopTaskInfo> setupTask = setupTask(jobId); @@ -579,12 +579,12 @@ public class HadoopJobTracker extends HadoopComponent { else if (phase == PHASE_MAP || phase == PHASE_REDUCE) { // Must check all nodes, even that are not event node ID due to // multiple node failure possibility. - Collection<GridHadoopInputSplit> cancelSplits = null; + Collection<HadoopInputSplit> cancelSplits = null; for (UUID nodeId : plan.mapperNodeIds()) { if (ctx.kernalContext().discovery().node(nodeId) == null) { // Node has left the grid. - Collection<GridHadoopInputSplit> mappers = plan.mappers(nodeId); + Collection<HadoopInputSplit> mappers = plan.mappers(nodeId); if (cancelSplits == null) cancelSplits = new HashSet<>(); @@ -693,7 +693,7 @@ public class HadoopJobTracker extends HadoopComponent { throws IgniteCheckedException { JobLocalState state = activeJobs.get(jobId); - GridHadoopJob job = job(jobId, meta.jobInfo()); + HadoopJob job = job(jobId, meta.jobInfo()); GridHadoopMapReducePlan plan = meta.mapReducePlan(); @@ -770,13 +770,13 @@ public class HadoopJobTracker extends HadoopComponent { } else { // Check if there are unscheduled mappers or reducers. - Collection<GridHadoopInputSplit> cancelMappers = new ArrayList<>(); + Collection<HadoopInputSplit> cancelMappers = new ArrayList<>(); Collection<Integer> cancelReducers = new ArrayList<>(); - Collection<GridHadoopInputSplit> mappers = plan.mappers(ctx.localNodeId()); + Collection<HadoopInputSplit> mappers = plan.mappers(ctx.localNodeId()); if (mappers != null) { - for (GridHadoopInputSplit b : mappers) { + for (HadoopInputSplit b : mappers) { if (state == null || !state.mapperScheduled(b)) cancelMappers.add(b); } @@ -836,7 +836,7 @@ public class HadoopJobTracker extends HadoopComponent { if (statWriterClsName != null) { Class<?> cls = ldr.loadClass(statWriterClsName); - GridHadoopCounterWriter writer = (GridHadoopCounterWriter)cls.newInstance(); + HadoopCounterWriter writer = (HadoopCounterWriter)cls.newInstance(); GridHadoopCounters cntrs = meta.counters(); @@ -879,7 +879,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param meta Job metadata. * @return Collection of created task infos or {@code null} if no mapper tasks scheduled for local node. */ - private Collection<GridHadoopTaskInfo> mapperTasks(Iterable<GridHadoopInputSplit> mappers, HadoopJobMetadata meta) { + private Collection<GridHadoopTaskInfo> mapperTasks(Iterable<HadoopInputSplit> mappers, HadoopJobMetadata meta) { UUID locNodeId = ctx.localNodeId(); GridHadoopJobId jobId = meta.jobId(); @@ -891,7 +891,7 @@ public class HadoopJobTracker extends HadoopComponent { if (state == null) state = initState(jobId); - for (GridHadoopInputSplit split : mappers) { + for (HadoopInputSplit split : mappers) { if (state.addMapper(split)) { if (log.isDebugEnabled()) log.debug("Submitting MAP task for execution [locNodeId=" + locNodeId + @@ -917,7 +917,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param job Job instance. * @return Collection of task infos. */ - private Collection<GridHadoopTaskInfo> reducerTasks(int[] reducers, GridHadoopJob job) { + private Collection<GridHadoopTaskInfo> reducerTasks(int[] reducers, HadoopJob job) { UUID locNodeId = ctx.localNodeId(); GridHadoopJobId jobId = job.id(); @@ -966,15 +966,15 @@ public class HadoopJobTracker extends HadoopComponent { * @return Job. * @throws IgniteCheckedException If failed. */ - @Nullable public GridHadoopJob job(GridHadoopJobId jobId, @Nullable GridHadoopJobInfo jobInfo) throws IgniteCheckedException { - GridFutureAdapterEx<GridHadoopJob> fut = jobs.get(jobId); + @Nullable public HadoopJob job(GridHadoopJobId jobId, @Nullable GridHadoopJobInfo jobInfo) throws IgniteCheckedException { + GridFutureAdapterEx<HadoopJob> fut = jobs.get(jobId); - if (fut != null || (fut = jobs.putIfAbsent(jobId, new GridFutureAdapterEx<GridHadoopJob>())) != null) + if (fut != null || (fut = jobs.putIfAbsent(jobId, new GridFutureAdapterEx<HadoopJob>())) != null) return fut.get(); fut = jobs.get(jobId); - GridHadoopJob job = null; + HadoopJob job = null; try { if (jobInfo == null) { @@ -1103,7 +1103,7 @@ public class HadoopJobTracker extends HadoopComponent { */ private class JobLocalState { /** Mappers. */ - private final Collection<GridHadoopInputSplit> currMappers = new HashSet<>(); + private final Collection<HadoopInputSplit> currMappers = new HashSet<>(); /** Reducers. */ private final Collection<Integer> currReducers = new HashSet<>(); @@ -1121,7 +1121,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param mapSplit Map split to add. * @return {@code True} if mapper was added. */ - private boolean addMapper(GridHadoopInputSplit mapSplit) { + private boolean addMapper(HadoopInputSplit mapSplit) { return currMappers.add(mapSplit); } @@ -1139,7 +1139,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param mapSplit Map split to check. * @return {@code True} if mapper was scheduled. */ - public boolean mapperScheduled(GridHadoopInputSplit mapSplit) { + public boolean mapperScheduled(HadoopInputSplit mapSplit) { return currMappers.contains(mapSplit); } @@ -1315,7 +1315,7 @@ public class HadoopJobTracker extends HadoopComponent { private static final long serialVersionUID = 0L; /** Mapper split to remove. */ - private final Collection<GridHadoopInputSplit> splits; + private final Collection<HadoopInputSplit> splits; /** Error. */ private final Throwable err; @@ -1325,7 +1325,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param split Mapper split to remove. * @param err Error. */ - private RemoveMappersProcessor(@Nullable StackedProcessor prev, GridHadoopInputSplit split, Throwable err) { + private RemoveMappersProcessor(@Nullable StackedProcessor prev, HadoopInputSplit split, Throwable err) { this(prev, Collections.singletonList(split), err); } @@ -1334,7 +1334,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param splits Mapper splits to remove. * @param err Error. */ - private RemoveMappersProcessor(@Nullable StackedProcessor prev, Collection<GridHadoopInputSplit> splits, + private RemoveMappersProcessor(@Nullable StackedProcessor prev, Collection<HadoopInputSplit> splits, Throwable err) { super(prev); @@ -1344,9 +1344,9 @@ public class HadoopJobTracker extends HadoopComponent { /** {@inheritDoc} */ @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) { - Map<GridHadoopInputSplit, Integer> splitsCp = new HashMap<>(cp.pendingSplits()); + Map<HadoopInputSplit, Integer> splitsCp = new HashMap<>(cp.pendingSplits()); - for (GridHadoopInputSplit s : splits) + for (HadoopInputSplit s : splits) splitsCp.remove(s); cp.pendingSplits(splitsCp); @@ -1466,7 +1466,7 @@ public class HadoopJobTracker extends HadoopComponent { private static final long serialVersionUID = 0L; /** Mapper split to remove. */ - private final Collection<GridHadoopInputSplit> splits; + private final Collection<HadoopInputSplit> splits; /** Reducers to remove. */ private final Collection<Integer> rdc; @@ -1488,7 +1488,7 @@ public class HadoopJobTracker extends HadoopComponent { * @param rdc Reducers to remove. */ private CancelJobProcessor(@Nullable StackedProcessor prev, - Collection<GridHadoopInputSplit> splits, + Collection<HadoopInputSplit> splits, Collection<Integer> rdc) { this(prev, null, splits, rdc); } @@ -1501,7 +1501,7 @@ public class HadoopJobTracker extends HadoopComponent { */ private CancelJobProcessor(@Nullable StackedProcessor prev, Throwable err, - Collection<GridHadoopInputSplit> splits, + Collection<HadoopInputSplit> splits, Collection<Integer> rdc) { super(prev); @@ -1521,10 +1521,10 @@ public class HadoopJobTracker extends HadoopComponent { cp.pendingReducers(rdcCp); - Map<GridHadoopInputSplit, Integer> splitsCp = new HashMap<>(cp.pendingSplits()); + Map<HadoopInputSplit, Integer> splitsCp = new HashMap<>(cp.pendingSplits()); if (splits != null) { - for (GridHadoopInputSplit s : splits) + for (HadoopInputSplit s : splits) splitsCp.remove(s); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java index 9ec2b5b..f24e8f2 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java @@ -30,7 +30,7 @@ public class HadoopDefaultMapReducePlan implements GridHadoopMapReducePlan { private static final long serialVersionUID = 0L; /** Mappers map. */ - private Map<UUID, Collection<GridHadoopInputSplit>> mappers; + private Map<UUID, Collection<HadoopInputSplit>> mappers; /** Reducers map. */ private Map<UUID, int[]> reducers; @@ -45,13 +45,13 @@ public class HadoopDefaultMapReducePlan implements GridHadoopMapReducePlan { * @param mappers Mappers map. * @param reducers Reducers map. */ - public HadoopDefaultMapReducePlan(Map<UUID, Collection<GridHadoopInputSplit>> mappers, + public HadoopDefaultMapReducePlan(Map<UUID, Collection<HadoopInputSplit>> mappers, Map<UUID, int[]> reducers) { this.mappers = mappers; this.reducers = reducers; if (mappers != null) { - for (Collection<GridHadoopInputSplit> splits : mappers.values()) + for (Collection<HadoopInputSplit> splits : mappers.values()) mappersCnt += splits.size(); } @@ -86,7 +86,7 @@ public class HadoopDefaultMapReducePlan implements GridHadoopMapReducePlan { } /** {@inheritDoc} */ - @Override @Nullable public Collection<GridHadoopInputSplit> mappers(UUID nodeId) { + @Override @Nullable public Collection<HadoopInputSplit> mappers(UUID nodeId) { return mappers.get(nodeId); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java index a625b3d..6e6e874 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java @@ -47,7 +47,7 @@ public class HadoopDefaultMapReducePlanner implements GridHadoopMapReducePlanner private IgniteLogger log; /** {@inheritDoc} */ - @Override public GridHadoopMapReducePlan preparePlan(GridHadoopJob job, Collection<ClusterNode> top, + @Override public GridHadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top, @Nullable GridHadoopMapReducePlan oldPlan) throws IgniteCheckedException { // Convert collection of topology nodes to collection of topology node IDs. Collection<UUID> topIds = new HashSet<>(top.size(), 1.0f); @@ -55,7 +55,7 @@ public class HadoopDefaultMapReducePlanner implements GridHadoopMapReducePlanner for (ClusterNode topNode : top) topIds.add(topNode.id()); - Map<UUID, Collection<GridHadoopInputSplit>> mappers = mappers(top, topIds, job.input()); + Map<UUID, Collection<HadoopInputSplit>> mappers = mappers(top, topIds, job.input()); int rdcCnt = job.info().reducers(); @@ -76,9 +76,9 @@ public class HadoopDefaultMapReducePlanner implements GridHadoopMapReducePlanner * @return Mappers map. * @throws IgniteCheckedException If failed. */ - private Map<UUID, Collection<GridHadoopInputSplit>> mappers(Collection<ClusterNode> top, Collection<UUID> topIds, - Iterable<GridHadoopInputSplit> splits) throws IgniteCheckedException { - Map<UUID, Collection<GridHadoopInputSplit>> mappers = new HashMap<>(); + private Map<UUID, Collection<HadoopInputSplit>> mappers(Collection<ClusterNode> top, Collection<UUID> topIds, + Iterable<HadoopInputSplit> splits) throws IgniteCheckedException { + Map<UUID, Collection<HadoopInputSplit>> mappers = new HashMap<>(); Map<String, Collection<UUID>> nodes = hosts(top); @@ -87,13 +87,13 @@ public class HadoopDefaultMapReducePlanner implements GridHadoopMapReducePlanner for (UUID nodeId : topIds) nodeLoads.put(nodeId, 0); - for (GridHadoopInputSplit split : splits) { + for (HadoopInputSplit split : splits) { UUID nodeId = nodeForSplit(split, topIds, nodes, nodeLoads); if (log.isDebugEnabled()) log.debug("Mapped split to node [split=" + split + ", nodeId=" + nodeId + ']'); - Collection<GridHadoopInputSplit> nodeSplits = mappers.get(nodeId); + Collection<HadoopInputSplit> nodeSplits = mappers.get(nodeId); if (nodeSplits == null) { nodeSplits = new ArrayList<>(); @@ -147,10 +147,10 @@ public class HadoopDefaultMapReducePlanner implements GridHadoopMapReducePlanner * @return Node ID. */ @SuppressWarnings("unchecked") - private UUID nodeForSplit(GridHadoopInputSplit split, Collection<UUID> topIds, Map<String, Collection<UUID>> nodes, + private UUID nodeForSplit(HadoopInputSplit split, Collection<UUID> topIds, Map<String, Collection<UUID>> nodes, Map<UUID, Integer> nodeLoads) throws IgniteCheckedException { - if (split instanceof GridHadoopFileBlock) { - GridHadoopFileBlock split0 = (GridHadoopFileBlock)split; + if (split instanceof HadoopFileBlock) { + HadoopFileBlock split0 = (HadoopFileBlock)split; if (IGFS_SCHEME.equalsIgnoreCase(split0.file().getScheme())) { HadoopIgfsEndpoint endpoint = new HadoopIgfsEndpoint(split0.file().getAuthority()); @@ -293,14 +293,14 @@ public class HadoopDefaultMapReducePlanner implements GridHadoopMapReducePlanner * @return Reducers map. */ private Map<UUID, int[]> reducers(Collection<ClusterNode> top, - Map<UUID, Collection<GridHadoopInputSplit>> mappers, int reducerCnt) { + Map<UUID, Collection<HadoopInputSplit>> mappers, int reducerCnt) { // Determine initial node weights. int totalWeight = 0; List<WeightedNode> nodes = new ArrayList<>(top.size()); for (ClusterNode node : top) { - Collection<GridHadoopInputSplit> split = mappers.get(node.id()); + Collection<HadoopInputSplit> split = mappers.get(node.id()); int weight = reducerNodeWeight(node, split != null ? split.size() : 0); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java index 56da194..6625d7d 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java @@ -31,7 +31,7 @@ public class HadoopProtocolJobCountersTask extends HadoopProtocolTaskAdapter<Gri private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public GridHadoopCounters run(ComputeJobContext jobCtx, GridHadoop hadoop, + @Override public GridHadoopCounters run(ComputeJobContext jobCtx, Hadoop hadoop, HadoopProtocolTaskArguments args) throws IgniteCheckedException { UUID nodeId = UUID.fromString(args.<String>get(0)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java index ac70c44..0714eb1 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java @@ -40,7 +40,7 @@ public class HadoopProtocolJobStatusTask extends HadoopProtocolTaskAdapter<GridH private static final String ATTR_HELD = "held"; /** {@inheritDoc} */ - @Override public GridHadoopJobStatus run(final ComputeJobContext jobCtx, GridHadoop hadoop, + @Override public GridHadoopJobStatus run(final ComputeJobContext jobCtx, Hadoop hadoop, HadoopProtocolTaskArguments args) throws IgniteCheckedException { UUID nodeId = UUID.fromString(args.<String>get(0)); Integer id = args.get(1); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java index 8522ab0..fc0e484 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java @@ -31,7 +31,7 @@ public class HadoopProtocolKillJobTask extends HadoopProtocolTaskAdapter<Boolean private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public Boolean run(ComputeJobContext jobCtx, GridHadoop hadoop, + @Override public Boolean run(ComputeJobContext jobCtx, Hadoop hadoop, HadoopProtocolTaskArguments args) throws IgniteCheckedException { UUID nodeId = UUID.fromString(args.<String>get(0)); Integer id = args.get(1); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java index 357e12d..e30feb7 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java @@ -28,7 +28,7 @@ public class HadoopProtocolNextTaskIdTask extends HadoopProtocolTaskAdapter<Grid private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public GridHadoopJobId run(ComputeJobContext jobCtx, GridHadoop hadoop, + @Override public GridHadoopJobId run(ComputeJobContext jobCtx, Hadoop hadoop, HadoopProtocolTaskArguments args) { return hadoop.nextJobId(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java index df03c79..1da4b58 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java @@ -33,7 +33,7 @@ public class HadoopProtocolSubmitJobTask extends HadoopProtocolTaskAdapter<GridH private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public GridHadoopJobStatus run(ComputeJobContext jobCtx, GridHadoop hadoop, + @Override public GridHadoopJobStatus run(ComputeJobContext jobCtx, Hadoop hadoop, HadoopProtocolTaskArguments args) throws IgniteCheckedException { UUID nodeId = UUID.fromString(args.<String>get(0)); Integer id = args.get(1); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java index 6938d1c..f763ccc 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java @@ -108,6 +108,6 @@ public abstract class HadoopProtocolTaskAdapter<R> implements ComputeTask<Hadoop * @return Job result. * @throws IgniteCheckedException If failed. */ - public abstract R run(ComputeJobContext jobCtx, GridHadoop hadoop, HadoopProtocolTaskArguments args) + public abstract R run(ComputeJobContext jobCtx, Hadoop hadoop, HadoopProtocolTaskArguments args) throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java index 267316e..f3c7837 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java @@ -199,7 +199,7 @@ public class HadoopShuffle extends HadoopComponent { * @param taskCtx Task info. * @return Output. */ - public GridHadoopTaskOutput output(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + public HadoopTaskOutput output(HadoopTaskContext taskCtx) throws IgniteCheckedException { return job(taskCtx.taskInfo().jobId()).output(taskCtx); } @@ -207,7 +207,7 @@ public class HadoopShuffle extends HadoopComponent { * @param taskCtx Task info. * @return Input. */ - public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException { return job(taskCtx.taskInfo().jobId()).input(taskCtx); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java index a75b34b..3dab6eb 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java @@ -47,7 +47,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable { private static final int MSG_BUF_SIZE = 128 * 1024; /** */ - private final GridHadoopJob job; + private final HadoopJob job; /** */ private final GridUnsafeMemory mem; @@ -56,7 +56,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable { private final boolean needPartitioner; /** Collection of task contexts for each reduce task. */ - private final Map<Integer, GridHadoopTaskContext> reducersCtx = new HashMap<>(); + private final Map<Integer, HadoopTaskContext> reducersCtx = new HashMap<>(); /** Reducers addresses. */ private T[] reduceAddrs; @@ -98,7 +98,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable { * @param locReducers Reducers will work on current node. * @throws IgniteCheckedException If error. */ - public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, GridHadoopJob job, GridUnsafeMemory mem, + public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, HadoopJob job, GridUnsafeMemory mem, int totalReducerCnt, int[] locReducers) throws IgniteCheckedException { this.locReduceAddr = locReduceAddr; this.job = job; @@ -107,7 +107,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable { if (!F.isEmpty(locReducers)) { for (int rdc : locReducers) { - GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(GridHadoopTaskType.REDUCE, job.id(), rdc, 0, null); + GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(HadoopTaskType.REDUCE, job.id(), rdc, 0, null); reducersCtx.put(rdc, job.getTaskContext(taskInfo)); } @@ -204,7 +204,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable { assert msg.buffer() != null; assert msg.offset() > 0; - GridHadoopTaskContext taskCtx = reducersCtx.get(msg.reducer()); + HadoopTaskContext taskCtx = reducersCtx.get(msg.reducer()); HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(taskCtx.counters(), null); @@ -487,7 +487,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable { * @return Output. * @throws IgniteCheckedException If failed. */ - public GridHadoopTaskOutput output(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + public HadoopTaskOutput output(HadoopTaskContext taskCtx) throws IgniteCheckedException { switch (taskCtx.taskInfo().type()) { case MAP: assert !job.info().hasCombiner() : "The output creation is allowed if combiner has not been defined."; @@ -506,7 +506,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable { * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException { switch (taskCtx.taskInfo().type()) { case REDUCE: int reducer = taskCtx.taskInfo().taskNumber(); @@ -516,7 +516,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable { if (m != null) return m.input(taskCtx); - return new GridHadoopTaskInput() { // Empty input. + return new HadoopTaskInput() { // Empty input. @Override public boolean next() { return false; } @@ -542,21 +542,21 @@ public class HadoopShuffleJob<T> implements AutoCloseable { /** * Partitioned output. */ - private class PartitionedOutput implements GridHadoopTaskOutput { + private class PartitionedOutput implements HadoopTaskOutput { /** */ - private final GridHadoopTaskOutput[] adders = new GridHadoopTaskOutput[maps.length()]; + private final HadoopTaskOutput[] adders = new HadoopTaskOutput[maps.length()]; /** */ - private GridHadoopPartitioner partitioner; + private HadoopPartitioner partitioner; /** */ - private final GridHadoopTaskContext taskCtx; + private final HadoopTaskContext taskCtx; /** * Constructor. * @param taskCtx Task context. */ - private PartitionedOutput(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + private PartitionedOutput(HadoopTaskContext taskCtx) throws IgniteCheckedException { this.taskCtx = taskCtx; if (needPartitioner) @@ -574,7 +574,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable { throw new IgniteCheckedException("Invalid partition: " + part); } - GridHadoopTaskOutput out = adders[part]; + HadoopTaskOutput out = adders[part]; if (out == null) adders[part] = out = getOrCreateMap(maps, part).startAdding(taskCtx); @@ -584,7 +584,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable { /** {@inheritDoc} */ @Override public void close() throws IgniteCheckedException { - for (GridHadoopTaskOutput adder : adders) { + for (HadoopTaskOutput adder : adders) { if (adder != null) adder.close(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java index 46d8bc9..82da910 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java @@ -87,7 +87,7 @@ public class HadoopConcurrentHashMultimap extends HadoopHashMultimapBase { * @return Adder object. * @param ctx Task context. */ - @Override public Adder startAdding(GridHadoopTaskContext ctx) throws IgniteCheckedException { + @Override public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException { if (inputs.get() != 0) throw new IllegalStateException("Active inputs."); @@ -162,7 +162,7 @@ public class HadoopConcurrentHashMultimap extends HadoopHashMultimapBase { } /** {@inheritDoc} */ - @Override public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException { inputs.incrementAndGet(); if (!adders.isEmpty()) @@ -369,7 +369,7 @@ public class HadoopConcurrentHashMultimap extends HadoopHashMultimapBase { * @param ctx Task context. * @throws IgniteCheckedException If failed. */ - private AdderImpl(GridHadoopTaskContext ctx) throws IgniteCheckedException { + private AdderImpl(HadoopTaskContext ctx) throws IgniteCheckedException { super(ctx); keyReader = new Reader(keySer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java index 15b93c6..fcf8e17 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java @@ -46,7 +46,7 @@ public class HadoopHashMultimap extends HadoopHashMultimapBase { } /** {@inheritDoc} */ - @Override public Adder startAdding(GridHadoopTaskContext ctx) throws IgniteCheckedException { + @Override public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException { return new AdderImpl(ctx); } @@ -103,7 +103,7 @@ public class HadoopHashMultimap extends HadoopHashMultimapBase { * @param ctx Task context. * @throws IgniteCheckedException If failed. */ - protected AdderImpl(GridHadoopTaskContext ctx) throws IgniteCheckedException { + protected AdderImpl(HadoopTaskContext ctx) throws IgniteCheckedException { super(ctx); keyReader = new Reader(keySer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java index f62a354..c464fd1 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java @@ -41,7 +41,7 @@ public abstract class HadoopHashMultimapBase extends HadoopMultimapBase { } /** {@inheritDoc} */ - @Override public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException { return new Input(taskCtx); } @@ -120,7 +120,7 @@ public abstract class HadoopHashMultimapBase extends HadoopMultimapBase { /** * @param ser Serialization. */ - protected Reader(GridHadoopSerialization ser) { + protected Reader(HadoopSerialization ser) { super(ser); } @@ -143,7 +143,7 @@ public abstract class HadoopHashMultimapBase extends HadoopMultimapBase { /** * Task input. */ - protected class Input implements GridHadoopTaskInput { + protected class Input implements HadoopTaskInput { /** */ private int idx = -1; @@ -163,7 +163,7 @@ public abstract class HadoopHashMultimapBase extends HadoopMultimapBase { * @param taskCtx Task context. * @throws IgniteCheckedException If failed. */ - public Input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + public Input(HadoopTaskContext taskCtx) throws IgniteCheckedException { cap = capacity(); keyReader = new Reader(taskCtx.keySerialization()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java index e1fa1f1..5def6d3 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java @@ -42,14 +42,14 @@ public interface HadoopMultimap extends AutoCloseable { * @return Adder. * @throws IgniteCheckedException If failed. */ - public Adder startAdding(GridHadoopTaskContext ctx) throws IgniteCheckedException; + public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException; /** * @param taskCtx Task context. * @return Task input. * @throws IgniteCheckedException If failed. */ - public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) + public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException; /** {@inheritDoc} */ @@ -58,7 +58,7 @@ public interface HadoopMultimap extends AutoCloseable { /** * Adder. */ - public interface Adder extends GridHadoopTaskOutput { + public interface Adder extends HadoopTaskOutput { /** * @param in Data input. * @param reuse Reusable key. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java index 4aa6e9e..5afcbc9 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java @@ -110,7 +110,7 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { private Object tmp; /** */ - private final GridHadoopSerialization ser; + private final HadoopSerialization ser; /** */ private final HadoopDataInStream in = new HadoopDataInStream(mem); @@ -118,7 +118,7 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { /** * @param ser Serialization. */ - protected ReaderBase(GridHadoopSerialization ser) { + protected ReaderBase(HadoopSerialization ser) { assert ser != null; this.ser = ser; @@ -172,10 +172,10 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { */ protected abstract class AdderBase implements Adder { /** */ - protected final GridHadoopSerialization keySer; + protected final HadoopSerialization keySer; /** */ - protected final GridHadoopSerialization valSer; + protected final HadoopSerialization valSer; /** */ private final HadoopDataOutStream out; @@ -190,7 +190,7 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { * @param ctx Task context. * @throws IgniteCheckedException If failed. */ - protected AdderBase(GridHadoopTaskContext ctx) throws IgniteCheckedException { + protected AdderBase(HadoopTaskContext ctx) throws IgniteCheckedException { valSer = ctx.valueSerialization(); keySer = ctx.keySerialization(); @@ -259,7 +259,7 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { * @return Page pointer. * @throws IgniteCheckedException If failed. */ - protected long write(int off, Object o, GridHadoopSerialization ser) throws IgniteCheckedException { + protected long write(int off, Object o, HadoopSerialization ser) throws IgniteCheckedException { writeStart = fixAlignment(); if (off != 0) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java index 07bae6b..c7bcda9 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java @@ -93,12 +93,12 @@ public class HadoopSkipList extends HadoopMultimapBase { } /** {@inheritDoc} */ - @Override public Adder startAdding(GridHadoopTaskContext ctx) throws IgniteCheckedException { + @Override public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException { return new AdderImpl(ctx); } /** {@inheritDoc} */ - @Override public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException { Input in = new Input(taskCtx); Comparator<Object> grpCmp = taskCtx.groupComparator(); @@ -243,7 +243,7 @@ public class HadoopSkipList extends HadoopMultimapBase { /** * @param ser Serialization. */ - protected Reader(GridHadoopSerialization ser) { + protected Reader(HadoopSerialization ser) { super(ser); } @@ -285,7 +285,7 @@ public class HadoopSkipList extends HadoopMultimapBase { * @param ctx Task context. * @throws IgniteCheckedException If failed. */ - protected AdderImpl(GridHadoopTaskContext ctx) throws IgniteCheckedException { + protected AdderImpl(HadoopTaskContext ctx) throws IgniteCheckedException { super(ctx); keyReader = new Reader(keySer); @@ -570,7 +570,7 @@ public class HadoopSkipList extends HadoopMultimapBase { /** * Task input. */ - private class Input implements GridHadoopTaskInput { + private class Input implements HadoopTaskInput { /** */ private long metaPtr = heads; @@ -584,7 +584,7 @@ public class HadoopSkipList extends HadoopMultimapBase { * @param taskCtx Task context. * @throws IgniteCheckedException If failed. */ - private Input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + private Input(HadoopTaskContext taskCtx) throws IgniteCheckedException { keyReader = new Reader(taskCtx.keySerialization()); valReader = new Reader(taskCtx.valueSerialization()); } @@ -616,7 +616,7 @@ public class HadoopSkipList extends HadoopMultimapBase { /** * Grouped input using grouping comparator. */ - private class GroupedInput implements GridHadoopTaskInput { + private class GroupedInput implements HadoopTaskInput { /** */ private final Comparator<Object> grpCmp; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java index e217c57..9858e12 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java @@ -69,7 +69,7 @@ public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter { } /** {@inheritDoc} */ - @Override public void run(final GridHadoopJob job, Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException { + @Override public void run(final HadoopJob job, Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException { if (log.isDebugEnabled()) log.debug("Submitting tasks for local execution [locNodeId=" + ctx.localNodeId() + ", tasksCnt=" + tasks.size() + ']'); @@ -101,11 +101,11 @@ public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter { jobTracker.onTaskFinished(info, status); } - @Override protected GridHadoopTaskInput createInput(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + @Override protected HadoopTaskInput createInput(HadoopTaskContext taskCtx) throws IgniteCheckedException { return ctx.shuffle().input(taskCtx); } - @Override protected GridHadoopTaskOutput createOutput(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + @Override protected HadoopTaskOutput createOutput(HadoopTaskContext taskCtx) throws IgniteCheckedException { return ctx.shuffle().output(taskCtx); } }; @@ -121,8 +121,8 @@ public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter { * for this job ID. * <p> * It is guaranteed that this method will not be called concurrently with - * {@link #run(GridHadoopJob, Collection)} method. No more job submissions will be performed via - * {@link #run(GridHadoopJob, Collection)} method for given job ID after this method is called. + * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method. No more job submissions will be performed via + * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method for given job ID after this method is called. * * @param jobId Job ID to cancel. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java index 5b10d6f..4776321 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java @@ -28,7 +28,7 @@ import java.util.*; import java.util.concurrent.*; import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobProperty.*; -import static org.apache.ignite.internal.processors.hadoop.GridHadoopTaskType.*; +import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.*; /** * Runnable task. @@ -41,7 +41,7 @@ public abstract class HadoopRunnableTask implements Callable<Void> { private final IgniteLogger log; /** */ - private final GridHadoopJob job; + private final HadoopJob job; /** Task to run. */ private final GridHadoopTaskInfo info; @@ -59,7 +59,7 @@ public abstract class HadoopRunnableTask implements Callable<Void> { private HadoopMultimap combinerInput; /** */ - private volatile GridHadoopTaskContext ctx; + private volatile HadoopTaskContext ctx; /** Set if task is to cancelling. */ private volatile boolean cancelled; @@ -74,7 +74,7 @@ public abstract class HadoopRunnableTask implements Callable<Void> { * @param info Task info. * @param nodeId Node id. */ - protected HadoopRunnableTask(IgniteLogger log, GridHadoopJob job, GridUnsafeMemory mem, GridHadoopTaskInfo info, + protected HadoopRunnableTask(IgniteLogger log, HadoopJob job, GridUnsafeMemory mem, GridHadoopTaskInfo info, UUID nodeId) { this.nodeId = nodeId; this.log = log.getLogger(HadoopRunnableTask.class); @@ -165,8 +165,8 @@ public abstract class HadoopRunnableTask implements Callable<Void> { if (cancelled) throw new HadoopTaskCancelledException("Task cancelled."); - try (GridHadoopTaskOutput out = createOutputInternal(ctx); - GridHadoopTaskInput in = createInputInternal(ctx)) { + try (HadoopTaskOutput out = createOutputInternal(ctx); + HadoopTaskInput in = createInputInternal(ctx)) { ctx.input(in); ctx.output(out); @@ -198,7 +198,7 @@ public abstract class HadoopRunnableTask implements Callable<Void> { * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - private GridHadoopTaskInput createInputInternal(GridHadoopTaskContext ctx) throws IgniteCheckedException { + private HadoopTaskInput createInputInternal(HadoopTaskContext ctx) throws IgniteCheckedException { switch (ctx.taskInfo().type()) { case SETUP: case MAP: @@ -221,21 +221,21 @@ public abstract class HadoopRunnableTask implements Callable<Void> { * @return Input. * @throws IgniteCheckedException If failed. */ - protected abstract GridHadoopTaskInput createInput(GridHadoopTaskContext ctx) throws IgniteCheckedException; + protected abstract HadoopTaskInput createInput(HadoopTaskContext ctx) throws IgniteCheckedException; /** * @param ctx Task info. * @return Output. * @throws IgniteCheckedException If failed. */ - protected abstract GridHadoopTaskOutput createOutput(GridHadoopTaskContext ctx) throws IgniteCheckedException; + protected abstract HadoopTaskOutput createOutput(HadoopTaskContext ctx) throws IgniteCheckedException; /** * @param ctx Task info. * @return Task output. * @throws IgniteCheckedException If failed. */ - private GridHadoopTaskOutput createOutputInternal(GridHadoopTaskContext ctx) throws IgniteCheckedException { + private HadoopTaskOutput createOutputInternal(HadoopTaskContext ctx) throws IgniteCheckedException { switch (ctx.taskInfo().type()) { case SETUP: case REDUCE: http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java index 2da2373..c2002e6 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java @@ -34,15 +34,15 @@ public abstract class HadoopTaskExecutorAdapter extends HadoopComponent { * @param tasks Tasks. * @throws IgniteCheckedException If failed. */ - public abstract void run(final GridHadoopJob job, Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException; + public abstract void run(final HadoopJob job, Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException; /** * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks * for this job ID. * <p> * It is guaranteed that this method will not be called concurrently with - * {@link #run(GridHadoopJob, Collection)} method. No more job submissions will be performed via - * {@link #run(GridHadoopJob, Collection)} method for given job ID after this method is called. + * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method. No more job submissions will be performed via + * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method for given job ID after this method is called. * * @param jobId Job ID to cancel. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java index f05761e..db95b2f 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java @@ -174,7 +174,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { } } else if (ctx.isParticipating(meta)) { - GridHadoopJob job; + HadoopJob job; try { job = jobTracker.job(meta.jobId(), meta.jobInfo()); @@ -191,7 +191,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { /** {@inheritDoc} */ @SuppressWarnings("ConstantConditions") - @Override public void run(final GridHadoopJob job, final Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException { + @Override public void run(final HadoopJob job, final Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException { if (!busyLock.tryReadLock()) { if (log.isDebugEnabled()) log.debug("Failed to start hadoop tasks (grid is stopping, will ignore)."); @@ -202,10 +202,10 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { try { HadoopProcess proc = runningProcsByJobId.get(job.id()); - GridHadoopTaskType taskType = F.first(tasks).type(); + HadoopTaskType taskType = F.first(tasks).type(); - if (taskType == GridHadoopTaskType.SETUP || taskType == GridHadoopTaskType.ABORT || - taskType == GridHadoopTaskType.COMMIT) { + if (taskType == HadoopTaskType.SETUP || taskType == HadoopTaskType.ABORT || + taskType == HadoopTaskType.COMMIT) { if (proc == null || proc.terminated()) { runningProcsByJobId.remove(job.id(), proc); @@ -269,7 +269,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { * @param job Job instance. * @param tasks Collection of tasks to execute in started process. */ - private void sendExecutionRequest(HadoopProcess proc, GridHadoopJob job, Collection<GridHadoopTaskInfo> tasks) + private void sendExecutionRequest(HadoopProcess proc, HadoopJob job, Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException { // Must synchronize since concurrent process crash may happen and will receive onConnectionLost(). proc.lock(); @@ -325,7 +325,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { * @param job Job instance. * @param plan Map reduce plan. */ - private HadoopProcess startProcess(final GridHadoopJob job, final GridHadoopMapReducePlan plan) { + private HadoopProcess startProcess(final HadoopJob job, final GridHadoopMapReducePlan plan) { final UUID childProcId = UUID.randomUUID(); GridHadoopJobId jobId = job.id(); @@ -494,7 +494,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { * @return Started process. */ private Process startJavaProcess(UUID childProcId, HadoopExternalTaskMetadata startMeta, - GridHadoopJob job) throws Exception { + HadoopJob job) throws Exception { String outFldr = jobWorkFolder(job.id()) + File.separator + childProcId; if (log.isDebugEnabled()) @@ -604,7 +604,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { * @param job Job. * @param plan Map reduce plan. */ - private void prepareForJob(HadoopProcess proc, GridHadoopJob job, GridHadoopMapReducePlan plan) { + private void prepareForJob(HadoopProcess proc, HadoopJob job, GridHadoopMapReducePlan plan) { try { comm.sendMessage(proc.descriptor(), new HadoopPrepareForJobRequest(job.id(), job.info(), plan.reducers(), plan.reducers(ctx.localNodeId())));