# IGNITE-386: Moving core classes (6).
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/06525cad Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/06525cad Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/06525cad Branch: refs/heads/ignite-386 Commit: 06525cad18194d779d9cad4ce67bf6ac9933d768 Parents: 4c85f12 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Tue Mar 3 17:01:53 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Tue Mar 3 17:01:53 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/internal/IgniteEx.java | 2 +- .../apache/ignite/internal/IgniteKernal.java | 2 +- .../internal/processors/hadoop/GridHadoop.java | 86 --- .../processors/hadoop/GridHadoopCounter.java | 44 -- .../hadoop/GridHadoopCounterWriter.java | 36 -- .../processors/hadoop/GridHadoopCounters.java | 4 +- .../processors/hadoop/GridHadoopFileBlock.java | 162 ------ .../processors/hadoop/GridHadoopInputSplit.java | 54 -- .../processors/hadoop/GridHadoopJob.java | 102 ---- .../processors/hadoop/GridHadoopJobInfo.java | 4 +- .../hadoop/GridHadoopMapReducePlan.java | 2 +- .../hadoop/GridHadoopMapReducePlanner.java | 2 +- .../hadoop/GridHadoopPartitioner.java | 33 -- .../hadoop/GridHadoopSerialization.java | 54 -- .../processors/hadoop/GridHadoopTask.java | 72 --- .../hadoop/GridHadoopTaskContext.java | 189 ------- .../processors/hadoop/GridHadoopTaskInfo.java | 16 +- .../processors/hadoop/GridHadoopTaskInput.java | 55 -- .../processors/hadoop/GridHadoopTaskOutput.java | 40 -- .../processors/hadoop/GridHadoopTaskType.java | 56 -- .../internal/processors/hadoop/Hadoop.java | 86 +++ .../processors/hadoop/HadoopCounter.java | 44 ++ .../processors/hadoop/HadoopCounterWriter.java | 36 ++ .../processors/hadoop/HadoopFileBlock.java | 162 ++++++ .../processors/hadoop/HadoopInputSplit.java | 54 ++ .../internal/processors/hadoop/HadoopJob.java | 102 ++++ .../processors/hadoop/HadoopPartitioner.java | 33 ++ .../processors/hadoop/HadoopSerialization.java | 54 ++ .../internal/processors/hadoop/HadoopTask.java | 72 +++ .../processors/hadoop/HadoopTaskContext.java | 189 +++++++ .../processors/hadoop/HadoopTaskInput.java | 55 ++ .../processors/hadoop/HadoopTaskOutput.java | 40 ++ .../processors/hadoop/HadoopTaskType.java | 56 ++ .../hadoop/IgniteHadoopNoopProcessor.java | 2 +- .../hadoop/IgniteHadoopProcessorAdapter.java | 2 +- .../fs/IgniteHadoopFileSystemCounterWriter.java | 2 +- .../processors/hadoop/HadoopCounters.java | 6 +- .../processors/hadoop/HadoopDefaultJobInfo.java | 4 +- .../internal/processors/hadoop/HadoopImpl.java | 2 +- .../processors/hadoop/HadoopProcessor.java | 4 +- .../hadoop/counter/HadoopCounterAdapter.java | 2 +- .../hadoop/counter/HadoopCountersImpl.java | 24 +- .../hadoop/counter/HadoopLongCounter.java | 2 +- .../counter/HadoopPerformanceCounter.java | 4 +- .../hadoop/jobtracker/HadoopJobMetadata.java | 10 +- .../hadoop/jobtracker/HadoopJobTracker.java | 68 +-- .../planner/HadoopDefaultMapReducePlan.java | 8 +- .../planner/HadoopDefaultMapReducePlanner.java | 24 +- .../proto/HadoopProtocolJobCountersTask.java | 2 +- .../proto/HadoopProtocolJobStatusTask.java | 2 +- .../hadoop/proto/HadoopProtocolKillJobTask.java | 2 +- .../proto/HadoopProtocolNextTaskIdTask.java | 2 +- .../proto/HadoopProtocolSubmitJobTask.java | 2 +- .../hadoop/proto/HadoopProtocolTaskAdapter.java | 2 +- .../hadoop/shuffle/HadoopShuffle.java | 4 +- .../hadoop/shuffle/HadoopShuffleJob.java | 30 +- .../HadoopConcurrentHashMultimap.java | 6 +- .../shuffle/collections/HadoopHashMultimap.java | 4 +- .../collections/HadoopHashMultimapBase.java | 8 +- .../shuffle/collections/HadoopMultimap.java | 6 +- .../shuffle/collections/HadoopMultimapBase.java | 12 +- .../shuffle/collections/HadoopSkipList.java | 14 +- .../HadoopEmbeddedTaskExecutor.java | 10 +- .../hadoop/taskexecutor/HadoopRunnableTask.java | 20 +- .../taskexecutor/HadoopTaskExecutorAdapter.java | 6 +- .../external/HadoopExternalTaskExecutor.java | 18 +- .../child/HadoopChildProcessRunner.java | 8 +- .../hadoop/v1/HadoopV1CleanupTask.java | 2 +- .../processors/hadoop/v1/HadoopV1MapTask.java | 10 +- .../hadoop/v1/HadoopV1OutputCollector.java | 4 +- .../hadoop/v1/HadoopV1Partitioner.java | 2 +- .../hadoop/v1/HadoopV1ReduceTask.java | 6 +- .../processors/hadoop/v1/HadoopV1Reporter.java | 4 +- .../processors/hadoop/v1/HadoopV1SetupTask.java | 2 +- .../processors/hadoop/v1/HadoopV1Splitter.java | 10 +- .../processors/hadoop/v1/HadoopV1Task.java | 2 +- .../hadoop/v2/HadoopExternalSplit.java | 2 +- .../hadoop/v2/HadoopSerializationWrapper.java | 2 +- .../hadoop/v2/HadoopSplitWrapper.java | 2 +- .../processors/hadoop/v2/HadoopV2Context.java | 12 +- .../processors/hadoop/v2/HadoopV2Job.java | 24 +- .../processors/hadoop/v2/HadoopV2MapTask.java | 6 +- .../hadoop/v2/HadoopV2Partitioner.java | 2 +- .../processors/hadoop/v2/HadoopV2Splitter.java | 10 +- .../processors/hadoop/v2/HadoopV2Task.java | 4 +- .../hadoop/v2/HadoopV2TaskContext.java | 26 +- .../hadoop/v2/HadoopWritableSerialization.java | 2 +- .../hadoop/HadoopClientProtocolSelfTest.java | 16 +- .../hadoop/GridHadoopAbstractSelfTest.java | 222 -------- .../hadoop/GridHadoopAbstractWordCountTest.java | 138 ----- .../hadoop/GridHadoopCommandLineTest.java | 440 --------------- .../hadoop/GridHadoopFileSystemsTest.java | 177 ------ .../hadoop/GridHadoopGroupingTest.java | 286 ---------- .../hadoop/GridHadoopJobTrackerSelfTest.java | 330 ----------- .../GridHadoopMapReduceEmbeddedSelfTest.java | 245 --------- .../hadoop/GridHadoopMapReduceTest.java | 196 ------- .../hadoop/GridHadoopSortingExternalTest.java | 32 -- .../hadoop/GridHadoopSortingTest.java | 281 ---------- .../hadoop/GridHadoopTaskExecutionSelfTest.java | 551 ------------------- .../hadoop/GridHadoopTasksAllVersionsTest.java | 259 --------- .../hadoop/GridHadoopTasksV1Test.java | 57 -- .../hadoop/GridHadoopTasksV2Test.java | 75 --- .../GridHadoopTestRoundRobinMrPlanner.java | 8 +- .../hadoop/GridHadoopValidationSelfTest.java | 53 -- .../hadoop/HadoopAbstractSelfTest.java | 222 ++++++++ .../hadoop/HadoopAbstractWordCountTest.java | 138 +++++ .../hadoop/HadoopCommandLineTest.java | 440 +++++++++++++++ .../HadoopDefaultMapReducePlannerSelfTest.java | 62 +-- .../hadoop/HadoopFileSystemsTest.java | 177 ++++++ .../processors/hadoop/HadoopGroupingTest.java | 286 ++++++++++ .../hadoop/HadoopJobTrackerSelfTest.java | 330 +++++++++++ .../hadoop/HadoopMapReduceEmbeddedSelfTest.java | 245 +++++++++ .../processors/hadoop/HadoopMapReduceTest.java | 196 +++++++ .../HadoopSerializationWrapperSelfTest.java | 4 +- .../hadoop/HadoopSortingExternalTest.java | 32 ++ .../processors/hadoop/HadoopSortingTest.java | 281 ++++++++++ .../hadoop/HadoopSplitWrapperSelfTest.java | 2 +- .../hadoop/HadoopTaskExecutionSelfTest.java | 551 +++++++++++++++++++ .../hadoop/HadoopTasksAllVersionsTest.java | 259 +++++++++ .../processors/hadoop/HadoopTasksV1Test.java | 57 ++ .../processors/hadoop/HadoopTasksV2Test.java | 75 +++ .../hadoop/HadoopTestTaskContext.java | 12 +- .../processors/hadoop/HadoopV2JobSelfTest.java | 10 +- .../hadoop/HadoopValidationSelfTest.java | 53 ++ .../collections/GridHadoopAbstractMapTest.java | 152 ----- ...ridHadoopConcurrentHashMultimapSelftest.java | 12 +- .../collections/GridHadoopHashMapSelfTest.java | 170 ------ .../collections/GridHadoopSkipListSelfTest.java | 303 ---------- .../collections/HadoopAbstractMapTest.java | 152 +++++ .../collections/HadoopHashMapSelfTest.java | 170 ++++++ .../collections/HadoopSkipListSelfTest.java | 303 ++++++++++ .../streams/GridHadoopDataStreamSelfTest.java | 151 ----- .../streams/HadoopDataStreamSelfTest.java | 151 +++++ ...GridHadoopExternalTaskExecutionSelfTest.java | 210 ------- .../HadoopExternalTaskExecutionSelfTest.java | 210 +++++++ .../testsuites/IgniteHadoopTestSuite.java | 32 +- 136 files changed, 5634 insertions(+), 5634 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java index 3c35a08..7557efe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java @@ -124,7 +124,7 @@ public interface IgniteEx extends Ignite { * * @return Hadoop. */ - public GridHadoop hadoop(); + public Hadoop hadoop(); /** {@inheritDoc} */ @Override IgniteClusterEx cluster(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index f46d071..16ffc30 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -2399,7 +2399,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } /** {@inheritDoc} */ - @Override public GridHadoop hadoop() { + @Override public Hadoop hadoop() { guard(); try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoop.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoop.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoop.java deleted file mode 100644 index c262d48..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoop.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.jetbrains.annotations.*; - -/** - * Hadoop facade providing access to Ignite Hadoop features. - */ -public interface GridHadoop { - /** - * Gets Hadoop module configuration. - * - * @return Hadoop module configuration. - */ - public GridHadoopConfiguration configuration(); - - /** - * Generate next job ID. - * - * @return Next job ID. - */ - public GridHadoopJobId nextJobId(); - - /** - * Submits job to job tracker. - * - * @param jobId Job ID to submit. - * @param jobInfo Job info to submit. - * @return Execution future. - */ - public IgniteInternalFuture<?> submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo); - - /** - * Gets Hadoop job execution status. - * - * @param jobId Job ID to get status for. - * @return Job execution status or {@code null} in case job with the given ID is not found. - * @throws IgniteCheckedException If failed. - */ - @Nullable public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException; - - /** - * Returns job counters. - * - * @param jobId Job ID to get counters for. - * @return Job counters object. - * @throws IgniteCheckedException If failed. - */ - public GridHadoopCounters counters(GridHadoopJobId jobId) throws IgniteCheckedException; - - /** - * Gets Hadoop finish future for particular job. - * - * @param jobId Job ID. - * @return Job finish future or {@code null} in case job with the given ID is not found. - * @throws IgniteCheckedException If failed. - */ - @Nullable public IgniteInternalFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException; - - /** - * Kills job. - * - * @param jobId Job ID. - * @return {@code True} if job was killed. - * @throws IgniteCheckedException If failed. - */ - public boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounter.java deleted file mode 100644 index 83902dd..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounter.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -/** - * Hadoop counter. - */ -public interface GridHadoopCounter { - /** - * Gets name. - * - * @return Name of the counter. - */ - public String name(); - - /** - * Gets counter group. - * - * @return Counter group's name. - */ - public String group(); - - /** - * Merge the given counter to this counter. - * - * @param cntr Counter to merge into this counter. - */ - public void merge(GridHadoopCounter cntr); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounterWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounterWriter.java deleted file mode 100644 index af72e69..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounterWriter.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; - -/** - * The object that writes some system counters to some storage for each running job. This operation is a part of - * whole statistics collection process. - */ -public interface GridHadoopCounterWriter { - /** - * Writes counters of given job to some statistics storage. - * - * @param jobInfo Job info. - * @param jobId Job id. - * @param cntrs Counters. - * @throws IgniteCheckedException If failed. - */ - public void write(GridHadoopJobInfo jobInfo, GridHadoopJobId jobId, GridHadoopCounters cntrs) throws IgniteCheckedException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounters.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounters.java index 91eb8a1..3d577b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounters.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounters.java @@ -31,14 +31,14 @@ public interface GridHadoopCounters { * @param cls Class for new instance creation if it's needed. * @return The counter that was found or added or {@code null} if create is false. */ - <T extends GridHadoopCounter> T counter(String grp, String name, Class<T> cls); + <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls); /** * Returns all existing counters. * * @return Collection of counters. */ - Collection<GridHadoopCounter> all(); + Collection<HadoopCounter> all(); /** * Merges all counters from another store with existing counters. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopFileBlock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopFileBlock.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopFileBlock.java deleted file mode 100644 index fae111a..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopFileBlock.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.net.*; -import java.util.*; - -/** - * Hadoop file block. - */ -public class GridHadoopFileBlock extends GridHadoopInputSplit { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - @GridToStringInclude - protected URI file; - - /** */ - @GridToStringInclude - protected long start; - - /** */ - @GridToStringInclude - protected long len; - - /** - * Creates new file block. - */ - public GridHadoopFileBlock() { - // No-op. - } - - /** - * Creates new file block. - * - * @param hosts List of hosts where the block resides. - * @param file File URI. - * @param start Start position of the block in the file. - * @param len Length of the block. - */ - public GridHadoopFileBlock(String[] hosts, URI file, long start, long len) { - A.notNull(hosts, "hosts", file, "file"); - - this.hosts = hosts; - this.file = file; - this.start = start; - this.len = len; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(file()); - out.writeLong(start()); - out.writeLong(length()); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - file = (URI)in.readObject(); - start = in.readLong(); - len = in.readLong(); - } - - /** - * @return Length. - */ - public long length() { - return len; - } - - /** - * @param len New length. - */ - public void length(long len) { - this.len = len; - } - - /** - * @return Start. - */ - public long start() { - return start; - } - - /** - * @param start New start. - */ - public void start(long start) { - this.start = start; - } - - /** - * @return File. - */ - public URI file() { - return file; - } - - /** - * @param file New file. - */ - public void file(URI file) { - this.file = file; - } - - /** - * @param hosts New hosts. - */ - public void hosts(String[] hosts) { - A.notNull(hosts, "hosts"); - - this.hosts = hosts; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (!(o instanceof GridHadoopFileBlock)) - return false; - - GridHadoopFileBlock that = (GridHadoopFileBlock)o; - - return len == that.len && start == that.start && file.equals(that.file); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = file.hashCode(); - - res = 31 * res + (int)(start ^ (start >>> 32)); - res = 31 * res + (int)(len ^ (len >>> 32)); - - return res; - } - - /** {@inheritDoc} */ - public String toString() { - return S.toString(GridHadoopFileBlock.class, this, "hosts", Arrays.toString(hosts)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopInputSplit.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopInputSplit.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopInputSplit.java deleted file mode 100644 index e68a6f5..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopInputSplit.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.io.*; - -/** - * Abstract fragment of an input data source. - */ -public abstract class GridHadoopInputSplit implements Externalizable { - /** */ - protected String[] hosts; - - /** - * Array of hosts where this input split resides. - * - * @return Hosts. - */ - public String[] hosts() { - assert hosts != null; - - return hosts; - } - - /** - * This method must be implemented for purpose of internal implementation. - * - * @param obj Another object. - * @return {@code true} If objects are equal. - */ - @Override public abstract boolean equals(Object obj); - - /** - * This method must be implemented for purpose of internal implementation. - * - * @return Hash code of the object. - */ - @Override public abstract int hashCode(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJob.java deleted file mode 100644 index f7ea105..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJob.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; - -import java.util.*; - -/** - * Hadoop job. - */ -public interface GridHadoopJob { - /** - * Gets job ID. - * - * @return Job ID. - */ - public GridHadoopJobId id(); - - /** - * Gets job information. - * - * @return Job information. - */ - public GridHadoopJobInfo info(); - - /** - * Gets collection of input splits for this job. - * - * @return Input splits. - */ - public Collection<GridHadoopInputSplit> input() throws IgniteCheckedException; - - /** - * Returns context for task execution. - * - * @param info Task info. - * @return Task Context. - * @throws IgniteCheckedException If failed. - */ - public GridHadoopTaskContext getTaskContext(GridHadoopTaskInfo info) throws IgniteCheckedException; - - /** - * Does all the needed initialization for the job. Will be called on each node where tasks for this job must - * be executed. - * <p> - * If job is running in external mode this method will be called on instance in Ignite node with parameter - * {@code false} and on instance in external process with parameter {@code true}. - * - * @param external If {@code true} then this job instance resides in external process. - * @param locNodeId Local node ID. - * @throws IgniteCheckedException If failed. - */ - public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException; - - /** - * Release all the resources. - * <p> - * If job is running in external mode this method will be called on instance in Ignite node with parameter - * {@code false} and on instance in external process with parameter {@code true}. - * - * @param external If {@code true} then this job instance resides in external process. - * @throws IgniteCheckedException If failed. - */ - public void dispose(boolean external) throws IgniteCheckedException; - - /** - * Prepare local environment for the task. - * - * @param info Task info. - * @throws IgniteCheckedException If failed. - */ - public void prepareTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException; - - /** - * Cleans up local environment of the task. - * - * @param info Task info. - * @throws IgniteCheckedException If failed. - */ - public void cleanupTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException; - - /** - * Cleans up the job staging directory. - */ - void cleanupStagingDirectory(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobInfo.java index 9a891f4..6c75e5d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobInfo.java @@ -51,7 +51,7 @@ public interface GridHadoopJobInfo extends Serializable { /** * Creates new job instance for the given ID. - * {@link GridHadoopJobInfo} is reusable for multiple jobs while {@link GridHadoopJob} is for one job execution. + * {@link GridHadoopJobInfo} is reusable for multiple jobs while {@link HadoopJob} is for one job execution. * This method will be called once for the same ID on one node, though it can be called on the same host * multiple times from different processes (in case of multiple nodes on the same host or external execution). * @@ -60,7 +60,7 @@ public interface GridHadoopJobInfo extends Serializable { * @return Job. * @throws IgniteCheckedException If failed. */ - GridHadoopJob createJob(GridHadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException; + HadoopJob createJob(GridHadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException; /** * @return Number of reducers configured for job. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlan.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlan.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlan.java index 2fd5160..bb638fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlan.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlan.java @@ -32,7 +32,7 @@ public interface GridHadoopMapReducePlan extends Serializable { * @param nodeId Node ID to check. * @return Collection of file blocks or {@code null} if no mappers should be executed on given node. */ - @Nullable public Collection<GridHadoopInputSplit> mappers(UUID nodeId); + @Nullable public Collection<HadoopInputSplit> mappers(UUID nodeId); /** * Gets reducer IDs that should be started on given node. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlanner.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlanner.java index 56c6913..0119eec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlanner.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlanner.java @@ -35,6 +35,6 @@ public interface GridHadoopMapReducePlanner { * @param oldPlan Old plan in case of partial failure. * @return Map reduce plan. */ - public GridHadoopMapReducePlan preparePlan(GridHadoopJob job, Collection<ClusterNode> top, + public GridHadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top, @Nullable GridHadoopMapReducePlan oldPlan) throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPartitioner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPartitioner.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPartitioner.java deleted file mode 100644 index fcde424..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPartitioner.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -/** - * Partitioner. - */ -public interface GridHadoopPartitioner { - /** - * Gets partition which is actually a reducer index for the given key and value pair. - * - * @param key Key. - * @param val Value. - * @param parts Number of partitions. - * @return Partition. - */ - public int partition(Object key, Object val, int parts); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSerialization.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSerialization.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSerialization.java deleted file mode 100644 index 5bc8806..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSerialization.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * Hadoop serialization. Not thread safe object, must be created for each thread or correctly synchronized. - */ -public interface GridHadoopSerialization extends AutoCloseable { - /** - * Writes the given object to output. - * - * @param out Output. - * @param obj Object to serialize. - * @throws IgniteCheckedException If failed. - */ - public void write(DataOutput out, Object obj) throws IgniteCheckedException; - - /** - * Reads object from the given input optionally reusing given instance. - * - * @param in Input. - * @param obj Object. - * @return New object or reused instance. - * @throws IgniteCheckedException If failed. - */ - public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException; - - /** - * Finalise the internal objects. - * - * @throws IgniteCheckedException If failed. - */ - @Override public void close() throws IgniteCheckedException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTask.java deleted file mode 100644 index be34f81..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTask.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; - -import java.io.*; - -/** - * Hadoop task. - */ -public abstract class GridHadoopTask { - /** */ - private GridHadoopTaskInfo taskInfo; - - /** - * Creates task. - * - * @param taskInfo Task info. - */ - protected GridHadoopTask(GridHadoopTaskInfo taskInfo) { - assert taskInfo != null; - - this.taskInfo = taskInfo; - } - - /** - * For {@link Externalizable}. - */ - @SuppressWarnings("ConstructorNotProtectedInAbstractClass") - public GridHadoopTask() { - // No-op. - } - - /** - * Gets task info. - * - * @return Task info. - */ - public GridHadoopTaskInfo info() { - return taskInfo; - } - - /** - * Runs task. - * - * @param taskCtx Context. - * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If interrupted. - * @throws IgniteCheckedException If failed. - */ - public abstract void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException; - - /** - * Interrupts task execution. - */ - public abstract void cancel(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskContext.java deleted file mode 100644 index bedd93b..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskContext.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; - -import java.util.*; - -/** - * Task context. - */ -public abstract class GridHadoopTaskContext { - /** */ - private final GridHadoopJob job; - - /** */ - private GridHadoopTaskInput input; - - /** */ - private GridHadoopTaskOutput output; - - /** */ - private GridHadoopTaskInfo taskInfo; - - /** - * @param taskInfo Task info. - * @param job Job. - */ - protected GridHadoopTaskContext(GridHadoopTaskInfo taskInfo, GridHadoopJob job) { - this.taskInfo = taskInfo; - this.job = job; - } - - /** - * Gets task info. - * - * @return Task info. - */ - public GridHadoopTaskInfo taskInfo() { - return taskInfo; - } - - /** - * Set a new task info. - * - * @param info Task info. - */ - public void taskInfo(GridHadoopTaskInfo info) { - taskInfo = info; - } - - /** - * Gets task output. - * - * @return Task output. - */ - public GridHadoopTaskOutput output() { - return output; - } - - /** - * Gets task input. - * - * @return Task input. - */ - public GridHadoopTaskInput input() { - return input; - } - - /** - * @return Job. - */ - public GridHadoopJob job() { - return job; - } - - /** - * Gets counter for the given name. - * - * @param grp Counter group's name. - * @param name Counter name. - * @return Counter. - */ - public abstract <T extends GridHadoopCounter> T counter(String grp, String name, Class<T> cls); - - /** - * Gets all known counters. - * - * @return Unmodifiable collection of counters. - */ - public abstract GridHadoopCounters counters(); - - /** - * Sets input of the task. - * - * @param in Input. - */ - public void input(GridHadoopTaskInput in) { - input = in; - } - - /** - * Sets output of the task. - * - * @param out Output. - */ - public void output(GridHadoopTaskOutput out) { - output = out; - } - - /** - * Gets partitioner. - * - * @return Partitioner. - * @throws IgniteCheckedException If failed. - */ - public abstract GridHadoopPartitioner partitioner() throws IgniteCheckedException; - - /** - * Gets serializer for values. - * - * @return Serializer for keys. - * @throws IgniteCheckedException If failed. - */ - public abstract GridHadoopSerialization keySerialization() throws IgniteCheckedException; - - /** - * Gets serializer for values. - * - * @return Serializer for values. - * @throws IgniteCheckedException If failed. - */ - public abstract GridHadoopSerialization valueSerialization() throws IgniteCheckedException; - - /** - * Gets sorting comparator. - * - * @return Comparator for sorting. - */ - public abstract Comparator<Object> sortComparator(); - - /** - * Gets comparator for grouping on combine or reduce operation. - * - * @return Comparator. - */ - public abstract Comparator<Object> groupComparator(); - - /** - * Execute current task. - * - * @throws IgniteCheckedException If failed. - */ - public abstract void run() throws IgniteCheckedException; - - /** - * Cancel current task execution. - */ - public abstract void cancel(); - - /** - * Prepare local environment for the task. - * - * @throws IgniteCheckedException If failed. - */ - public abstract void prepareTaskEnvironment() throws IgniteCheckedException; - - /** - * Cleans up local environment of the task. - * - * @throws IgniteCheckedException If failed. - */ - public abstract void cleanupTaskEnvironment() throws IgniteCheckedException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInfo.java index 75e06ca..7107f17 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInfo.java @@ -30,7 +30,7 @@ public class GridHadoopTaskInfo implements Externalizable { private static final long serialVersionUID = 0L; /** */ - private GridHadoopTaskType type; + private HadoopTaskType type; /** */ private GridHadoopJobId jobId; @@ -42,7 +42,7 @@ public class GridHadoopTaskInfo implements Externalizable { private int attempt; /** */ - private GridHadoopInputSplit inputSplit; + private HadoopInputSplit inputSplit; /** * For {@link Externalizable}. @@ -60,8 +60,8 @@ public class GridHadoopTaskInfo implements Externalizable { * @param attempt Attempt for this task. * @param inputSplit Input split. */ - public GridHadoopTaskInfo(GridHadoopTaskType type, GridHadoopJobId jobId, int taskNum, int attempt, - @Nullable GridHadoopInputSplit inputSplit) { + public GridHadoopTaskInfo(HadoopTaskType type, GridHadoopJobId jobId, int taskNum, int attempt, + @Nullable HadoopInputSplit inputSplit) { this.type = type; this.jobId = jobId; this.taskNum = taskNum; @@ -80,17 +80,17 @@ public class GridHadoopTaskInfo implements Externalizable { /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - type = GridHadoopTaskType.fromOrdinal(in.readByte()); + type = HadoopTaskType.fromOrdinal(in.readByte()); jobId = (GridHadoopJobId)in.readObject(); taskNum = in.readInt(); attempt = in.readInt(); - inputSplit = (GridHadoopInputSplit)in.readObject(); + inputSplit = (HadoopInputSplit)in.readObject(); } /** * @return Type. */ - public GridHadoopTaskType type() { + public HadoopTaskType type() { return type; } @@ -118,7 +118,7 @@ public class GridHadoopTaskInfo implements Externalizable { /** * @return Input split. */ - @Nullable public GridHadoopInputSplit inputSplit() { + @Nullable public HadoopInputSplit inputSplit() { return inputSplit; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInput.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInput.java deleted file mode 100644 index 479cf6d..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInput.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; - -import java.util.*; - -/** - * Task input. - */ -public interface GridHadoopTaskInput extends AutoCloseable { - /** - * Moves cursor to the next element. - * - * @return {@code false} If input is exceeded. - */ - boolean next(); - - /** - * Gets current key. - * - * @return Key. - */ - Object key(); - - /** - * Gets values for current key. - * - * @return Values. - */ - Iterator<?> values(); - - /** - * Closes input. - * - * @throws IgniteCheckedException If failed. - */ - @Override public void close() throws IgniteCheckedException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskOutput.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskOutput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskOutput.java deleted file mode 100644 index 6480d8d..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskOutput.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; - -/** - * Task output. - */ -public interface GridHadoopTaskOutput extends AutoCloseable { - /** - * Writes key and value to the output. - * - * @param key Key. - * @param val Value. - */ - public void write(Object key, Object val) throws IgniteCheckedException; - - /** - * Closes output. - * - * @throws IgniteCheckedException If failed. - */ - @Override public void close() throws IgniteCheckedException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskType.java deleted file mode 100644 index 404d6b8..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskType.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.jetbrains.annotations.*; - -/** -* Task type. -*/ -public enum GridHadoopTaskType { - /** Setup task. */ - SETUP, - - /** Map task. */ - MAP, - - /** Reduce task. */ - REDUCE, - - /** Combine task. */ - COMBINE, - - /** Commit task. */ - COMMIT, - - /** Abort task. */ - ABORT; - - /** Enumerated values. */ - private static final GridHadoopTaskType[] VALS = values(); - - /** - * Efficiently gets enumerated value from its ordinal. - * - * @param ord Ordinal value. - * @return Enumerated value. - */ - @Nullable public static GridHadoopTaskType fromOrdinal(byte ord) { - return ord >= 0 && ord < VALS.length ? VALS[ord] : null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/Hadoop.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/Hadoop.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/Hadoop.java new file mode 100644 index 0000000..1df1378 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/Hadoop.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.jetbrains.annotations.*; + +/** + * Hadoop facade providing access to Ignite Hadoop features. + */ +public interface Hadoop { + /** + * Gets Hadoop module configuration. + * + * @return Hadoop module configuration. + */ + public GridHadoopConfiguration configuration(); + + /** + * Generate next job ID. + * + * @return Next job ID. + */ + public GridHadoopJobId nextJobId(); + + /** + * Submits job to job tracker. + * + * @param jobId Job ID to submit. + * @param jobInfo Job info to submit. + * @return Execution future. + */ + public IgniteInternalFuture<?> submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo); + + /** + * Gets Hadoop job execution status. + * + * @param jobId Job ID to get status for. + * @return Job execution status or {@code null} in case job with the given ID is not found. + * @throws IgniteCheckedException If failed. + */ + @Nullable public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException; + + /** + * Returns job counters. + * + * @param jobId Job ID to get counters for. + * @return Job counters object. + * @throws IgniteCheckedException If failed. + */ + public GridHadoopCounters counters(GridHadoopJobId jobId) throws IgniteCheckedException; + + /** + * Gets Hadoop finish future for particular job. + * + * @param jobId Job ID. + * @return Job finish future or {@code null} in case job with the given ID is not found. + * @throws IgniteCheckedException If failed. + */ + @Nullable public IgniteInternalFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException; + + /** + * Kills job. + * + * @param jobId Job ID. + * @return {@code True} if job was killed. + * @throws IgniteCheckedException If failed. + */ + public boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounter.java new file mode 100644 index 0000000..581144a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +/** + * Hadoop counter. + */ +public interface HadoopCounter { + /** + * Gets name. + * + * @return Name of the counter. + */ + public String name(); + + /** + * Gets counter group. + * + * @return Counter group's name. + */ + public String group(); + + /** + * Merge the given counter to this counter. + * + * @param cntr Counter to merge into this counter. + */ + public void merge(HadoopCounter cntr); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterWriter.java new file mode 100644 index 0000000..0d33fd2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterWriter.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; + +/** + * The object that writes some system counters to some storage for each running job. This operation is a part of + * whole statistics collection process. + */ +public interface HadoopCounterWriter { + /** + * Writes counters of given job to some statistics storage. + * + * @param jobInfo Job info. + * @param jobId Job id. + * @param cntrs Counters. + * @throws IgniteCheckedException If failed. + */ + public void write(GridHadoopJobInfo jobInfo, GridHadoopJobId jobId, GridHadoopCounters cntrs) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopFileBlock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopFileBlock.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopFileBlock.java new file mode 100644 index 0000000..223e572 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopFileBlock.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * Hadoop file block. + */ +public class HadoopFileBlock extends HadoopInputSplit { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @GridToStringInclude + protected URI file; + + /** */ + @GridToStringInclude + protected long start; + + /** */ + @GridToStringInclude + protected long len; + + /** + * Creates new file block. + */ + public HadoopFileBlock() { + // No-op. + } + + /** + * Creates new file block. + * + * @param hosts List of hosts where the block resides. + * @param file File URI. + * @param start Start position of the block in the file. + * @param len Length of the block. + */ + public HadoopFileBlock(String[] hosts, URI file, long start, long len) { + A.notNull(hosts, "hosts", file, "file"); + + this.hosts = hosts; + this.file = file; + this.start = start; + this.len = len; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(file()); + out.writeLong(start()); + out.writeLong(length()); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + file = (URI)in.readObject(); + start = in.readLong(); + len = in.readLong(); + } + + /** + * @return Length. + */ + public long length() { + return len; + } + + /** + * @param len New length. + */ + public void length(long len) { + this.len = len; + } + + /** + * @return Start. + */ + public long start() { + return start; + } + + /** + * @param start New start. + */ + public void start(long start) { + this.start = start; + } + + /** + * @return File. + */ + public URI file() { + return file; + } + + /** + * @param file New file. + */ + public void file(URI file) { + this.file = file; + } + + /** + * @param hosts New hosts. + */ + public void hosts(String[] hosts) { + A.notNull(hosts, "hosts"); + + this.hosts = hosts; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (!(o instanceof HadoopFileBlock)) + return false; + + HadoopFileBlock that = (HadoopFileBlock)o; + + return len == that.len && start == that.start && file.equals(that.file); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = file.hashCode(); + + res = 31 * res + (int)(start ^ (start >>> 32)); + res = 31 * res + (int)(len ^ (len >>> 32)); + + return res; + } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(HadoopFileBlock.class, this, "hosts", Arrays.toString(hosts)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopInputSplit.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopInputSplit.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopInputSplit.java new file mode 100644 index 0000000..0c94012 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopInputSplit.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import java.io.*; + +/** + * Abstract fragment of an input data source. + */ +public abstract class HadoopInputSplit implements Externalizable { + /** */ + protected String[] hosts; + + /** + * Array of hosts where this input split resides. + * + * @return Hosts. + */ + public String[] hosts() { + assert hosts != null; + + return hosts; + } + + /** + * This method must be implemented for purpose of internal implementation. + * + * @param obj Another object. + * @return {@code true} If objects are equal. + */ + @Override public abstract boolean equals(Object obj); + + /** + * This method must be implemented for purpose of internal implementation. + * + * @return Hash code of the object. + */ + @Override public abstract int hashCode(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java new file mode 100644 index 0000000..facb0ce --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; + +import java.util.*; + +/** + * Hadoop job. + */ +public interface HadoopJob { + /** + * Gets job ID. + * + * @return Job ID. + */ + public GridHadoopJobId id(); + + /** + * Gets job information. + * + * @return Job information. + */ + public GridHadoopJobInfo info(); + + /** + * Gets collection of input splits for this job. + * + * @return Input splits. + */ + public Collection<HadoopInputSplit> input() throws IgniteCheckedException; + + /** + * Returns context for task execution. + * + * @param info Task info. + * @return Task Context. + * @throws IgniteCheckedException If failed. + */ + public HadoopTaskContext getTaskContext(GridHadoopTaskInfo info) throws IgniteCheckedException; + + /** + * Does all the needed initialization for the job. Will be called on each node where tasks for this job must + * be executed. + * <p> + * If job is running in external mode this method will be called on instance in Ignite node with parameter + * {@code false} and on instance in external process with parameter {@code true}. + * + * @param external If {@code true} then this job instance resides in external process. + * @param locNodeId Local node ID. + * @throws IgniteCheckedException If failed. + */ + public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException; + + /** + * Release all the resources. + * <p> + * If job is running in external mode this method will be called on instance in Ignite node with parameter + * {@code false} and on instance in external process with parameter {@code true}. + * + * @param external If {@code true} then this job instance resides in external process. + * @throws IgniteCheckedException If failed. + */ + public void dispose(boolean external) throws IgniteCheckedException; + + /** + * Prepare local environment for the task. + * + * @param info Task info. + * @throws IgniteCheckedException If failed. + */ + public void prepareTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException; + + /** + * Cleans up local environment of the task. + * + * @param info Task info. + * @throws IgniteCheckedException If failed. + */ + public void cleanupTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException; + + /** + * Cleans up the job staging directory. + */ + void cleanupStagingDirectory(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPartitioner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPartitioner.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPartitioner.java new file mode 100644 index 0000000..ec94f81 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPartitioner.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +/** + * Partitioner. + */ +public interface HadoopPartitioner { + /** + * Gets partition which is actually a reducer index for the given key and value pair. + * + * @param key Key. + * @param val Value. + * @param parts Number of partitions. + * @return Partition. + */ + public int partition(Object key, Object val, int parts); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSerialization.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSerialization.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSerialization.java new file mode 100644 index 0000000..aab803b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSerialization.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Hadoop serialization. Not thread safe object, must be created for each thread or correctly synchronized. + */ +public interface HadoopSerialization extends AutoCloseable { + /** + * Writes the given object to output. + * + * @param out Output. + * @param obj Object to serialize. + * @throws IgniteCheckedException If failed. + */ + public void write(DataOutput out, Object obj) throws IgniteCheckedException; + + /** + * Reads object from the given input optionally reusing given instance. + * + * @param in Input. + * @param obj Object. + * @return New object or reused instance. + * @throws IgniteCheckedException If failed. + */ + public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException; + + /** + * Finalise the internal objects. + * + * @throws IgniteCheckedException If failed. + */ + @Override public void close() throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTask.java new file mode 100644 index 0000000..c6a409f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTask.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; + +import java.io.*; + +/** + * Hadoop task. + */ +public abstract class HadoopTask { + /** */ + private GridHadoopTaskInfo taskInfo; + + /** + * Creates task. + * + * @param taskInfo Task info. + */ + protected HadoopTask(GridHadoopTaskInfo taskInfo) { + assert taskInfo != null; + + this.taskInfo = taskInfo; + } + + /** + * For {@link Externalizable}. + */ + @SuppressWarnings("ConstructorNotProtectedInAbstractClass") + public HadoopTask() { + // No-op. + } + + /** + * Gets task info. + * + * @return Task info. + */ + public GridHadoopTaskInfo info() { + return taskInfo; + } + + /** + * Runs task. + * + * @param taskCtx Context. + * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If interrupted. + * @throws IgniteCheckedException If failed. + */ + public abstract void run(HadoopTaskContext taskCtx) throws IgniteCheckedException; + + /** + * Interrupts task execution. + */ + public abstract void cancel(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java new file mode 100644 index 0000000..4b66a92 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; + +import java.util.*; + +/** + * Task context. + */ +public abstract class HadoopTaskContext { + /** */ + private final HadoopJob job; + + /** */ + private HadoopTaskInput input; + + /** */ + private HadoopTaskOutput output; + + /** */ + private GridHadoopTaskInfo taskInfo; + + /** + * @param taskInfo Task info. + * @param job Job. + */ + protected HadoopTaskContext(GridHadoopTaskInfo taskInfo, HadoopJob job) { + this.taskInfo = taskInfo; + this.job = job; + } + + /** + * Gets task info. + * + * @return Task info. + */ + public GridHadoopTaskInfo taskInfo() { + return taskInfo; + } + + /** + * Set a new task info. + * + * @param info Task info. + */ + public void taskInfo(GridHadoopTaskInfo info) { + taskInfo = info; + } + + /** + * Gets task output. + * + * @return Task output. + */ + public HadoopTaskOutput output() { + return output; + } + + /** + * Gets task input. + * + * @return Task input. + */ + public HadoopTaskInput input() { + return input; + } + + /** + * @return Job. + */ + public HadoopJob job() { + return job; + } + + /** + * Gets counter for the given name. + * + * @param grp Counter group's name. + * @param name Counter name. + * @return Counter. + */ + public abstract <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls); + + /** + * Gets all known counters. + * + * @return Unmodifiable collection of counters. + */ + public abstract GridHadoopCounters counters(); + + /** + * Sets input of the task. + * + * @param in Input. + */ + public void input(HadoopTaskInput in) { + input = in; + } + + /** + * Sets output of the task. + * + * @param out Output. + */ + public void output(HadoopTaskOutput out) { + output = out; + } + + /** + * Gets partitioner. + * + * @return Partitioner. + * @throws IgniteCheckedException If failed. + */ + public abstract HadoopPartitioner partitioner() throws IgniteCheckedException; + + /** + * Gets serializer for values. + * + * @return Serializer for keys. + * @throws IgniteCheckedException If failed. + */ + public abstract HadoopSerialization keySerialization() throws IgniteCheckedException; + + /** + * Gets serializer for values. + * + * @return Serializer for values. + * @throws IgniteCheckedException If failed. + */ + public abstract HadoopSerialization valueSerialization() throws IgniteCheckedException; + + /** + * Gets sorting comparator. + * + * @return Comparator for sorting. + */ + public abstract Comparator<Object> sortComparator(); + + /** + * Gets comparator for grouping on combine or reduce operation. + * + * @return Comparator. + */ + public abstract Comparator<Object> groupComparator(); + + /** + * Execute current task. + * + * @throws IgniteCheckedException If failed. + */ + public abstract void run() throws IgniteCheckedException; + + /** + * Cancel current task execution. + */ + public abstract void cancel(); + + /** + * Prepare local environment for the task. + * + * @throws IgniteCheckedException If failed. + */ + public abstract void prepareTaskEnvironment() throws IgniteCheckedException; + + /** + * Cleans up local environment of the task. + * + * @throws IgniteCheckedException If failed. + */ + public abstract void cleanupTaskEnvironment() throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInput.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInput.java new file mode 100644 index 0000000..ad6446f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInput.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; + +import java.util.*; + +/** + * Task input. + */ +public interface HadoopTaskInput extends AutoCloseable { + /** + * Moves cursor to the next element. + * + * @return {@code false} If input is exceeded. + */ + boolean next(); + + /** + * Gets current key. + * + * @return Key. + */ + Object key(); + + /** + * Gets values for current key. + * + * @return Values. + */ + Iterator<?> values(); + + /** + * Closes input. + * + * @throws IgniteCheckedException If failed. + */ + @Override public void close() throws IgniteCheckedException; +}