# ignite-63
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/afd94f76 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/afd94f76 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/afd94f76 Branch: refs/heads/ignite-63 Commit: afd94f76fd974ef95801073803cfeff08bdf294e Parents: 4b080d9 Author: sboikov <sboi...@gridgain.com> Authored: Thu Jan 22 16:49:10 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Jan 22 16:49:17 2015 +0300 ---------------------------------------------------------------------- config/hadoop/default-config.xml | 2 +- .../src/main/java/org/apache/ignite/Ignite.java | 3 +- .../configuration/IgniteConfiguration.java | 2 +- .../org/apache/ignite/hadoop/GridHadoop.java | 86 + .../ignite/hadoop/GridHadoopConfiguration.java | 172 + .../apache/ignite/hadoop/GridHadoopCounter.java | 44 + .../ignite/hadoop/GridHadoopCounterWriter.java | 36 + .../ignite/hadoop/GridHadoopCounters.java | 49 + .../ignite/hadoop/GridHadoopFileBlock.java | 162 + .../ignite/hadoop/GridHadoopInputSplit.java | 54 + .../org/apache/ignite/hadoop/GridHadoopJob.java | 102 + .../apache/ignite/hadoop/GridHadoopJobId.java | 102 + .../apache/ignite/hadoop/GridHadoopJobInfo.java | 83 + .../ignite/hadoop/GridHadoopJobPhase.java | 38 + .../ignite/hadoop/GridHadoopJobProperty.java | 138 + .../ignite/hadoop/GridHadoopJobStatus.java | 207 + .../ignite/hadoop/GridHadoopMapReducePlan.java | 80 + .../hadoop/GridHadoopMapReducePlanner.java | 40 + .../ignite/hadoop/GridHadoopPartitioner.java | 33 + .../ignite/hadoop/GridHadoopSerialization.java | 54 + .../apache/ignite/hadoop/GridHadoopTask.java | 73 + .../ignite/hadoop/GridHadoopTaskContext.java | 189 + .../ignite/hadoop/GridHadoopTaskInfo.java | 153 + .../ignite/hadoop/GridHadoopTaskInput.java | 55 + .../ignite/hadoop/GridHadoopTaskOutput.java | 40 + .../ignite/hadoop/GridHadoopTaskType.java | 56 + .../java/org/apache/ignite/hadoop/package.html | 24 + .../org/gridgain/grid/hadoop/GridHadoop.java | 86 - .../grid/hadoop/GridHadoopConfiguration.java | 172 - .../gridgain/grid/hadoop/GridHadoopCounter.java | 44 - .../grid/hadoop/GridHadoopCounterWriter.java | 36 - .../grid/hadoop/GridHadoopCounters.java | 49 - .../grid/hadoop/GridHadoopFileBlock.java | 162 - .../grid/hadoop/GridHadoopInputSplit.java | 54 - .../org/gridgain/grid/hadoop/GridHadoopJob.java | 102 - .../gridgain/grid/hadoop/GridHadoopJobId.java | 102 - .../gridgain/grid/hadoop/GridHadoopJobInfo.java | 84 - .../grid/hadoop/GridHadoopJobPhase.java | 38 - .../grid/hadoop/GridHadoopJobProperty.java | 138 - .../grid/hadoop/GridHadoopJobStatus.java | 207 - .../grid/hadoop/GridHadoopMapReducePlan.java | 80 - .../grid/hadoop/GridHadoopMapReducePlanner.java | 40 - .../grid/hadoop/GridHadoopPartitioner.java | 33 - .../grid/hadoop/GridHadoopSerialization.java | 54 - .../gridgain/grid/hadoop/GridHadoopTask.java | 73 - .../grid/hadoop/GridHadoopTaskContext.java | 189 - .../grid/hadoop/GridHadoopTaskInfo.java | 153 - .../grid/hadoop/GridHadoopTaskInput.java | 55 - .../grid/hadoop/GridHadoopTaskOutput.java | 40 - .../grid/hadoop/GridHadoopTaskType.java | 56 - .../java/org/gridgain/grid/hadoop/package.html | 24 - .../org/gridgain/grid/kernal/GridKernal.java | 2 +- .../hadoop/GridHadoopNoopProcessor.java | 2 +- .../hadoop/GridHadoopProcessorAdapter.java | 2 +- .../client/hadoop/GridHadoopClientProtocol.java | 2 +- .../counter/GridHadoopClientCounters.java | 2 +- .../ignite/hadoop/GridHadoopDefaultJobInfo.java | 163 + .../apache/ignite/hadoop/GridHadoopSetup.java | 506 + .../grid/hadoop/GridHadoopDefaultJobInfo.java | 164 - .../gridgain/grid/hadoop/GridHadoopSetup.java | 506 - .../processors/hadoop/GridHadoopContext.java | 2 +- .../processors/hadoop/GridHadoopImpl.java | 3 +- .../processors/hadoop/GridHadoopProcessor.java | 3 +- .../processors/hadoop/GridHadoopUtils.java | 3 +- .../counter/GridHadoopCounterAdapter.java | 2 +- .../hadoop/counter/GridHadoopCountersImpl.java | 3 +- .../counter/GridHadoopFSCounterWriter.java | 3 +- .../hadoop/counter/GridHadoopLongCounter.java | 2 +- .../counter/GridHadoopPerformanceCounter.java | 2 +- .../jobtracker/GridHadoopJobMetadata.java | 4 +- .../hadoop/jobtracker/GridHadoopJobTracker.java | 6 +- .../planner/GridHadoopDefaultMapReducePlan.java | 2 +- .../GridHadoopDefaultMapReducePlanner.java | 3 +- .../GridHadoopProtocolJobCountersTask.java | 3 +- .../proto/GridHadoopProtocolJobStatusTask.java | 3 +- .../proto/GridHadoopProtocolKillJobTask.java | 3 +- .../proto/GridHadoopProtocolNextTaskIdTask.java | 3 +- .../proto/GridHadoopProtocolSubmitJobTask.java | 5 +- .../proto/GridHadoopProtocolTaskAdapter.java | 3 +- .../hadoop/shuffle/GridHadoopShuffle.java | 3 +- .../hadoop/shuffle/GridHadoopShuffleAck.java | 2 +- .../hadoop/shuffle/GridHadoopShuffleJob.java | 4 +- .../shuffle/GridHadoopShuffleMessage.java | 3 +- .../GridHadoopConcurrentHashMultimap.java | 3 +- .../collections/GridHadoopHashMultimap.java | 3 +- .../collections/GridHadoopHashMultimapBase.java | 3 +- .../shuffle/collections/GridHadoopMultimap.java | 3 +- .../collections/GridHadoopMultimapBase.java | 5 +- .../shuffle/collections/GridHadoopSkipList.java | 3 +- .../GridHadoopEmbeddedTaskExecutor.java | 3 +- .../taskexecutor/GridHadoopExecutorService.java | 2 +- .../taskexecutor/GridHadoopRunnableTask.java | 7 +- .../GridHadoopTaskExecutorAdapter.java | 3 +- .../taskexecutor/GridHadoopTaskStatus.java | 2 +- .../GridHadoopExternalTaskExecutor.java | 3 +- .../GridHadoopJobInfoUpdateRequest.java | 2 +- .../GridHadoopPrepareForJobRequest.java | 2 +- .../GridHadoopTaskExecutionRequest.java | 2 +- .../external/GridHadoopTaskFinishedMessage.java | 2 +- .../child/GridHadoopChildProcessRunner.java | 5 +- .../hadoop/v1/GridHadoopV1CleanupTask.java | 3 +- .../hadoop/v1/GridHadoopV1MapTask.java | 3 +- .../hadoop/v1/GridHadoopV1OutputCollector.java | 3 +- .../hadoop/v1/GridHadoopV1Partitioner.java | 2 +- .../hadoop/v1/GridHadoopV1ReduceTask.java | 3 +- .../hadoop/v1/GridHadoopV1Reporter.java | 2 +- .../hadoop/v1/GridHadoopV1SetupTask.java | 3 +- .../hadoop/v1/GridHadoopV1Splitter.java | 3 +- .../processors/hadoop/v1/GridHadoopV1Task.java | 2 +- .../hadoop/v2/GridHadoopExternalSplit.java | 2 +- .../v2/GridHadoopSerializationWrapper.java | 3 +- .../hadoop/v2/GridHadoopSplitWrapper.java | 2 +- .../hadoop/v2/GridHadoopV2CleanupTask.java | 2 +- .../hadoop/v2/GridHadoopV2Context.java | 3 +- .../processors/hadoop/v2/GridHadoopV2Job.java | 3 +- .../v2/GridHadoopV2JobResourceManager.java | 2 +- .../hadoop/v2/GridHadoopV2MapTask.java | 2 +- .../hadoop/v2/GridHadoopV2Partitioner.java | 2 +- .../hadoop/v2/GridHadoopV2ReduceTask.java | 2 +- .../hadoop/v2/GridHadoopV2SetupTask.java | 2 +- .../hadoop/v2/GridHadoopV2Splitter.java | 2 +- .../processors/hadoop/v2/GridHadoopV2Task.java | 3 +- .../hadoop/v2/GridHadoopV2TaskContext.java | 2 +- .../v2/GridHadoopWritableSerialization.java | 3 +- ...ridHadoopClientProtocolEmbeddedSelfTest.java | 2 +- .../hadoop/GridHadoopPopularWordsTest.java | 294 + .../ignite/hadoop/GridHadoopTestUtils.java | 102 + .../ignite/hadoop/books/alice-in-wonderland.txt | 3735 +++++ .../apache/ignite/hadoop/books/art-of-war.txt | 6982 +++++++++ .../ignite/hadoop/books/huckleberry-finn.txt | 11733 +++++++++++++++ .../ignite/hadoop/books/sherlock-holmes.txt | 13052 +++++++++++++++++ .../apache/ignite/hadoop/books/tom-sawyer.txt | 8858 +++++++++++ .../grid/hadoop/GridHadoopPopularWordsTest.java | 294 - .../grid/hadoop/GridHadoopTestUtils.java | 102 - .../grid/hadoop/books/alice-in-wonderland.txt | 3735 ----- .../gridgain/grid/hadoop/books/art-of-war.txt | 6982 --------- .../grid/hadoop/books/huckleberry-finn.txt | 11733 --------------- .../grid/hadoop/books/sherlock-holmes.txt | 13052 ----------------- .../gridgain/grid/hadoop/books/tom-sawyer.txt | 8858 ----------- .../hadoop/GridHadoopAbstractSelfTest.java | 2 +- .../hadoop/GridHadoopCommandLineTest.java | 2 +- ...idHadoopDefaultMapReducePlannerSelfTest.java | 2 +- .../hadoop/GridHadoopGroupingTest.java | 2 +- .../hadoop/GridHadoopJobTrackerSelfTest.java | 2 +- .../GridHadoopMapReduceEmbeddedSelfTest.java | 2 +- .../hadoop/GridHadoopMapReduceTest.java | 2 +- .../GridHadoopSerializationWrapperSelfTest.java | 3 +- .../hadoop/GridHadoopSortingExternalTest.java | 2 +- .../hadoop/GridHadoopSortingTest.java | 2 +- .../hadoop/GridHadoopSplitWrapperSelfTest.java | 1 - .../hadoop/GridHadoopTaskExecutionSelfTest.java | 3 +- .../hadoop/GridHadoopTasksAllVersionsTest.java | 3 +- .../hadoop/GridHadoopTasksV1Test.java | 2 +- .../hadoop/GridHadoopTasksV2Test.java | 2 +- .../GridHadoopTestRoundRobinMrPlanner.java | 3 +- .../hadoop/GridHadoopTestTaskContext.java | 3 +- .../hadoop/GridHadoopV2JobSelfTest.java | 3 +- .../collections/GridHadoopAbstractMapTest.java | 3 +- ...ridHadoopConcurrentHashMultimapSelftest.java | 2 +- .../collections/GridHadoopHashMapSelfTest.java | 2 +- .../collections/GridHadoopSkipListSelfTest.java | 2 +- ...GridHadoopExternalTaskExecutionSelfTest.java | 3 +- .../java/org/gridgain/grid/GridSpringBean.java | 2 +- pom.xml | 4 +- 164 files changed, 47602 insertions(+), 47650 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/config/hadoop/default-config.xml ---------------------------------------------------------------------- diff --git a/config/hadoop/default-config.xml b/config/hadoop/default-config.xml index 90c4259..e076524 100644 --- a/config/hadoop/default-config.xml +++ b/config/hadoop/default-config.xml @@ -94,7 +94,7 @@ Apache Hadoop Accelerator configuration. --> <property name="hadoopConfiguration"> - <bean class="org.gridgain.grid.hadoop.GridHadoopConfiguration"> + <bean class="org.apache.ignite.hadoop.GridHadoopConfiguration"> <!-- Information about finished jobs will be kept for 30 seconds. --> <property name="finishedJobInfoTtl" value="30000"/> </bean> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/Ignite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java index d1e1a55..d5d4de2 100644 --- a/modules/core/src/main/java/org/apache/ignite/Ignite.java +++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java @@ -22,9 +22,8 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.fs.IgniteFsConfiguration; import org.apache.ignite.plugin.*; import org.apache.ignite.product.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.hadoop.*; +import org.apache.ignite.hadoop.*; import org.apache.ignite.plugin.security.*; import org.gridgain.grid.util.typedef.*; import org.jetbrains.annotations.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 6508c95..265af4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -32,7 +32,7 @@ import org.apache.ignite.spi.indexing.*; import org.apache.ignite.streamer.*; import org.apache.ignite.client.ssl.*; import org.gridgain.grid.dotnet.*; -import org.gridgain.grid.hadoop.*; +import org.apache.ignite.hadoop.*; import org.gridgain.grid.kernal.managers.eventstorage.*; import org.apache.ignite.plugin.security.*; import org.apache.ignite.plugin.segmentation.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoop.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoop.java b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoop.java new file mode 100644 index 0000000..64d2dec --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoop.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +/** + * Hadoop facade providing access to GridGain Hadoop features. + */ +public interface GridHadoop { + /** + * Gets Hadoop module configuration. + * + * @return Hadoop module configuration. + */ + public GridHadoopConfiguration configuration(); + + /** + * Generate next job ID. + * + * @return Next job ID. + */ + public GridHadoopJobId nextJobId(); + + /** + * Submits job to job tracker. + * + * @param jobId Job ID to submit. + * @param jobInfo Job info to submit. + * @return Execution future. + */ + public IgniteFuture<?> submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo); + + /** + * Gets Hadoop job execution status. + * + * @param jobId Job ID to get status for. + * @return Job execution status or {@code null} in case job with the given ID is not found. + * @throws IgniteCheckedException If failed. + */ + @Nullable public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException; + + /** + * Returns job counters. + * + * @param jobId Job ID to get counters for. + * @return Job counters object. + * @throws IgniteCheckedException If failed. + */ + public GridHadoopCounters counters(GridHadoopJobId jobId) throws IgniteCheckedException; + + /** + * Gets Hadoop finish future for particular job. + * + * @param jobId Job ID. + * @return Job finish future or {@code null} in case job with the given ID is not found. + * @throws IgniteCheckedException If failed. + */ + @Nullable public IgniteFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException; + + /** + * Kills job. + * + * @param jobId Job ID. + * @return {@code True} if job was killed. + * @throws IgniteCheckedException If failed. + */ + public boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopConfiguration.java b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopConfiguration.java new file mode 100644 index 0000000..4948484 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopConfiguration.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +import org.gridgain.grid.util.typedef.internal.*; + +/** + * Hadoop configuration. + */ +public class GridHadoopConfiguration { + /** Default finished job info time-to-live. */ + public static final long DFLT_FINISHED_JOB_INFO_TTL = 10_000; + + /** Default value for external execution flag. */ + public static final boolean DFLT_EXTERNAL_EXECUTION = false; + + /** Default value for the max parallel tasks. */ + public static final int DFLT_MAX_PARALLEL_TASKS = Runtime.getRuntime().availableProcessors(); + + /** Default value for the max task queue size. */ + public static final int DFLT_MAX_TASK_QUEUE_SIZE = 1000; + + /** Map reduce planner. */ + private GridHadoopMapReducePlanner planner; + + /** */ + private boolean extExecution = DFLT_EXTERNAL_EXECUTION; + + /** Finished job info TTL. */ + private long finishedJobInfoTtl = DFLT_FINISHED_JOB_INFO_TTL; + + /** */ + private int maxParallelTasks = DFLT_MAX_PARALLEL_TASKS; + + /** */ + private int maxTaskQueueSize = DFLT_MAX_TASK_QUEUE_SIZE; + + /** + * Default constructor. + */ + public GridHadoopConfiguration() { + // No-op. + } + + /** + * Copy constructor. + * + * @param cfg Configuration to copy. + */ + public GridHadoopConfiguration(GridHadoopConfiguration cfg) { + // Preserve alphabetic order. + extExecution = cfg.isExternalExecution(); + finishedJobInfoTtl = cfg.getFinishedJobInfoTtl(); + planner = cfg.getMapReducePlanner(); + maxParallelTasks = cfg.getMaxParallelTasks(); + maxTaskQueueSize = cfg.getMaxTaskQueueSize(); + } + + /** + * Gets max number of local tasks that may be executed in parallel. + * + * @return Max number of local tasks that may be executed in parallel. + */ + public int getMaxParallelTasks() { + return maxParallelTasks; + } + + /** + * Sets max number of local tasks that may be executed in parallel. + * + * @param maxParallelTasks Max number of local tasks that may be executed in parallel. + */ + public void setMaxParallelTasks(int maxParallelTasks) { + this.maxParallelTasks = maxParallelTasks; + } + + /** + * Gets max task queue size. + * + * @return Max task queue size. + */ + public int getMaxTaskQueueSize() { + return maxTaskQueueSize; + } + + /** + * Sets max task queue size. + * + * @param maxTaskQueueSize Max task queue size. + */ + public void setMaxTaskQueueSize(int maxTaskQueueSize) { + this.maxTaskQueueSize = maxTaskQueueSize; + } + + /** + * Gets finished job info time-to-live in milliseconds. + * + * @return Finished job info time-to-live. + */ + public long getFinishedJobInfoTtl() { + return finishedJobInfoTtl; + } + + /** + * Sets finished job info time-to-live. + * + * @param finishedJobInfoTtl Finished job info time-to-live. + */ + public void setFinishedJobInfoTtl(long finishedJobInfoTtl) { + this.finishedJobInfoTtl = finishedJobInfoTtl; + } + + /** + * Gets external task execution flag. If {@code true}, hadoop job tasks will be executed in an external + * (relative to node) process. + * + * @return {@code True} if external execution. + */ + public boolean isExternalExecution() { + return extExecution; + } + + /** + * Sets external task execution flag. + * + * @param extExecution {@code True} if tasks should be executed in an external process. + * @see #isExternalExecution() + */ + public void setExternalExecution(boolean extExecution) { + this.extExecution = extExecution; + } + + /** + * Gets Hadoop map-reduce planner, a component which defines job execution plan based on job + * configuration and current grid topology. + * + * @return Map-reduce planner. + */ + public GridHadoopMapReducePlanner getMapReducePlanner() { + return planner; + } + + /** + * Sets Hadoop map-reduce planner, a component which defines job execution plan based on job + * configuration and current grid topology. + * + * @param planner Map-reduce planner. + */ + public void setMapReducePlanner(GridHadoopMapReducePlanner planner) { + this.planner = planner; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridHadoopConfiguration.class, this, super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopCounter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopCounter.java b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopCounter.java new file mode 100644 index 0000000..5837f13 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopCounter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +/** + * Hadoop counter. + */ +public interface GridHadoopCounter { + /** + * Gets name. + * + * @return Name of the counter. + */ + public String name(); + + /** + * Gets counter group. + * + * @return Counter group's name. + */ + public String group(); + + /** + * Merge the given counter to this counter. + * + * @param cntr Counter to merge into this counter. + */ + public void merge(GridHadoopCounter cntr); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopCounterWriter.java b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopCounterWriter.java new file mode 100644 index 0000000..574a6b9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopCounterWriter.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +import org.apache.ignite.*; + +/** + * The object that writes some system counters to some storage for each running job. This operation is a part of + * whole statistics collection process. + */ +public interface GridHadoopCounterWriter { + /** + * Writes counters of given job to some statistics storage. + * + * @param jobInfo Job info. + * @param jobId Job id. + * @param cntrs Counters. + * @throws IgniteCheckedException If failed. + */ + public void write(GridHadoopJobInfo jobInfo, GridHadoopJobId jobId, GridHadoopCounters cntrs) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopCounters.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopCounters.java b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopCounters.java new file mode 100644 index 0000000..3ec130a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopCounters.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +import java.util.*; + +/** + * Counters store. + */ +public interface GridHadoopCounters { + /** + * Returns counter for the specified group and counter name. Creates new if it does not exist. + * + * @param grp Counter group name. + * @param name Counter name. + * @param cls Class for new instance creation if it's needed. + * @return The counter that was found or added or {@code null} if create is false. + */ + <T extends GridHadoopCounter> T counter(String grp, String name, Class<T> cls); + + /** + * Returns all existing counters. + * + * @return Collection of counters. + */ + Collection<GridHadoopCounter> all(); + + /** + * Merges all counters from another store with existing counters. + * + * @param other Counters to merge with. + */ + void merge(GridHadoopCounters other); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopFileBlock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopFileBlock.java b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopFileBlock.java new file mode 100644 index 0000000..a9c8801 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopFileBlock.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +import org.gridgain.grid.util.tostring.*; +import org.gridgain.grid.util.typedef.internal.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * Hadoop file block. + */ +public class GridHadoopFileBlock extends GridHadoopInputSplit { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @GridToStringInclude + protected URI file; + + /** */ + @GridToStringInclude + protected long start; + + /** */ + @GridToStringInclude + protected long len; + + /** + * Creates new file block. + */ + public GridHadoopFileBlock() { + // No-op. + } + + /** + * Creates new file block. + * + * @param hosts List of hosts where the block resides. + * @param file File URI. + * @param start Start position of the block in the file. + * @param len Length of the block. + */ + public GridHadoopFileBlock(String[] hosts, URI file, long start, long len) { + A.notNull(hosts, "hosts", file, "file"); + + this.hosts = hosts; + this.file = file; + this.start = start; + this.len = len; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(file()); + out.writeLong(start()); + out.writeLong(length()); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + file = (URI)in.readObject(); + start = in.readLong(); + len = in.readLong(); + } + + /** + * @return Length. + */ + public long length() { + return len; + } + + /** + * @param len New length. + */ + public void length(long len) { + this.len = len; + } + + /** + * @return Start. + */ + public long start() { + return start; + } + + /** + * @param start New start. + */ + public void start(long start) { + this.start = start; + } + + /** + * @return File. + */ + public URI file() { + return file; + } + + /** + * @param file New file. + */ + public void file(URI file) { + this.file = file; + } + + /** + * @param hosts New hosts. + */ + public void hosts(String[] hosts) { + A.notNull(hosts, "hosts"); + + this.hosts = hosts; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (!(o instanceof GridHadoopFileBlock)) + return false; + + GridHadoopFileBlock that = (GridHadoopFileBlock)o; + + return len == that.len && start == that.start && file.equals(that.file); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = file.hashCode(); + + res = 31 * res + (int)(start ^ (start >>> 32)); + res = 31 * res + (int)(len ^ (len >>> 32)); + + return res; + } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(GridHadoopFileBlock.class, this, "hosts", Arrays.toString(hosts)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopInputSplit.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopInputSplit.java b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopInputSplit.java new file mode 100644 index 0000000..3ed6359 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopInputSplit.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +import java.io.*; + +/** + * Abstract fragment of an input data source. + */ +public abstract class GridHadoopInputSplit implements Externalizable { + /** */ + protected String[] hosts; + + /** + * Array of hosts where this input split resides. + * + * @return Hosts. + */ + public String[] hosts() { + assert hosts != null; + + return hosts; + } + + /** + * This method must be implemented for purpose of internal implementation. + * + * @param obj Another object. + * @return {@code true} If objects are equal. + */ + @Override public abstract boolean equals(Object obj); + + /** + * This method must be implemented for purpose of internal implementation. + * + * @return Hash code of the object. + */ + @Override public abstract int hashCode(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopJob.java b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopJob.java new file mode 100644 index 0000000..2ba9c4f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopJob.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +import org.apache.ignite.*; + +import java.util.*; + +/** + * Hadoop job. + */ +public interface GridHadoopJob { + /** + * Gets job ID. + * + * @return Job ID. + */ + public GridHadoopJobId id(); + + /** + * Gets job information. + * + * @return Job information. + */ + public GridHadoopJobInfo info(); + + /** + * Gets collection of input splits for this job. + * + * @return Input splits. + */ + public Collection<GridHadoopInputSplit> input() throws IgniteCheckedException; + + /** + * Returns context for task execution. + * + * @param info Task info. + * @return Task Context. + * @throws IgniteCheckedException If failed. + */ + public GridHadoopTaskContext getTaskContext(GridHadoopTaskInfo info) throws IgniteCheckedException; + + /** + * Does all the needed initialization for the job. Will be called on each node where tasks for this job must + * be executed. + * <p> + * If job is running in external mode this method will be called on instance in GridGain node with parameter + * {@code false} and on instance in external process with parameter {@code true}. + * + * @param external If {@code true} then this job instance resides in external process. + * @param locNodeId Local node ID. + * @throws IgniteCheckedException If failed. + */ + public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException; + + /** + * Release all the resources. + * <p> + * If job is running in external mode this method will be called on instance in GridGain node with parameter + * {@code false} and on instance in external process with parameter {@code true}. + * + * @param external If {@code true} then this job instance resides in external process. + * @throws IgniteCheckedException If failed. + */ + public void dispose(boolean external) throws IgniteCheckedException; + + /** + * Prepare local environment for the task. + * + * @param info Task info. + * @throws IgniteCheckedException If failed. + */ + public void prepareTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException; + + /** + * Cleans up local environment of the task. + * + * @param info Task info. + * @throws IgniteCheckedException If failed. + */ + public void cleanupTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException; + + /** + * Cleans up the job staging directory. + */ + void cleanupStagingDirectory(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopJobId.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopJobId.java b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopJobId.java new file mode 100644 index 0000000..34f5f64 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopJobId.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +import org.gridgain.grid.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * Job ID. + */ +public class GridHadoopJobId implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private UUID nodeId; + + /** */ + private int jobId; + + /** + * For {@link Externalizable}. + */ + public GridHadoopJobId() { + // No-op. + } + + /** + * @param nodeId Node ID. + * @param jobId Job ID. + */ + public GridHadoopJobId(UUID nodeId, int jobId) { + this.nodeId = nodeId; + this.jobId = jobId; + } + + public UUID globalId() { + return nodeId; + } + + public int localId() { + return jobId; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeUuid(out, nodeId); + out.writeInt(jobId); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + nodeId = U.readUuid(in); + jobId = in.readInt(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + GridHadoopJobId that = (GridHadoopJobId) o; + + if (jobId != that.jobId) + return false; + + if (!nodeId.equals(that.nodeId)) + return false; + + return true; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return 31 * nodeId.hashCode() + jobId; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return nodeId + "_" + jobId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopJobInfo.java b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopJobInfo.java new file mode 100644 index 0000000..fa2a00d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopJobInfo.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +import org.apache.ignite.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Compact job description. + */ +public interface GridHadoopJobInfo extends Serializable { + /** + * Gets optional configuration property for the job. + * + * @param name Property name. + * @return Value or {@code null} if none. + */ + @Nullable public String property(String name); + + /** + * Checks whether job has combiner. + * + * @return {@code true} If job has combiner. + */ + public boolean hasCombiner(); + + /** + * Checks whether job has reducer. + * Actual number of reducers will be in {@link GridHadoopMapReducePlan#reducers()}. + * + * @return Number of reducer. + */ + public boolean hasReducer(); + + /** + * Creates new job instance for the given ID. + * {@link GridHadoopJobInfo} is reusable for multiple jobs while {@link GridHadoopJob} is for one job execution. + * This method will be called once for the same ID on one node, though it can be called on the same host + * multiple times from different processes (in case of multiple nodes on the same host or external execution). + * + * @param jobId Job ID. + * @param log Logger. + * @return Job. + * @throws IgniteCheckedException If failed. + */ + GridHadoopJob createJob(GridHadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException; + + /** + * @return Number of reducers configured for job. + */ + public int reducers(); + + /** + * Gets job name. + * + * @return Job name. + */ + public String jobName(); + + /** + * Gets user name. + * + * @return User name. + */ + public String user(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopJobPhase.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopJobPhase.java b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopJobPhase.java new file mode 100644 index 0000000..0433f41 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopJobPhase.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +/** + * Job run phase. + */ +public enum GridHadoopJobPhase { + /** Job is running setup task. */ + PHASE_SETUP, + + /** Job is running map and combine tasks. */ + PHASE_MAP, + + /** Job has finished all map tasks and running reduce tasks. */ + PHASE_REDUCE, + + /** Job is stopping due to exception during any of the phases. */ + PHASE_CANCELLING, + + /** Job has finished execution. */ + PHASE_COMPLETE +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopJobProperty.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopJobProperty.java b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopJobProperty.java new file mode 100644 index 0000000..7793c3c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopJobProperty.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +import org.jetbrains.annotations.*; + +/** + * Enumeration of optional properties supported by GridGain for Apache Hadoop. + */ +public enum GridHadoopJobProperty { + /** + * Initial size for hashmap which stores output of mapper and will be used as input of combiner. + * <p> + * Setting it right allows to avoid rehashing. + */ + COMBINER_HASHMAP_SIZE, + + /** + * Initial size for hashmap which stores output of mapper or combiner and will be used as input of reducer. + * <p> + * Setting it right allows to avoid rehashing. + */ + PARTITION_HASHMAP_SIZE, + + /** + * Specifies number of concurrently running mappers for external execution mode. + * <p> + * If not specified, defaults to {@code Runtime.getRuntime().availableProcessors()}. + */ + EXTERNAL_CONCURRENT_MAPPERS, + + /** + * Specifies number of concurrently running reducers for external execution mode. + * <p> + * If not specified, defaults to {@code Runtime.getRuntime().availableProcessors()}. + */ + EXTERNAL_CONCURRENT_REDUCERS, + + /** + * Delay in milliseconds after which GridGain server will reply job status. + */ + JOB_STATUS_POLL_DELAY, + + /** + * Size in bytes of single memory page which will be allocated for data structures in shuffle. + * <p> + * By default is {@code 32 * 1024}. + */ + SHUFFLE_OFFHEAP_PAGE_SIZE, + + /** + * If set to {@code true} then input for combiner will not be sorted by key. + * Internally hash-map will be used instead of sorted one, so {@link Object#equals(Object)} + * and {@link Object#hashCode()} methods of key must be implemented consistently with + * comparator for that type. Grouping comparator is not supported if this setting is {@code true}. + * <p> + * By default is {@code false}. + */ + SHUFFLE_COMBINER_NO_SORTING, + + /** + * If set to {@code true} then input for reducer will not be sorted by key. + * Internally hash-map will be used instead of sorted one, so {@link Object#equals(Object)} + * and {@link Object#hashCode()} methods of key must be implemented consistently with + * comparator for that type. Grouping comparator is not supported if this setting is {@code true}. + * <p> + * By default is {@code false}. + */ + SHUFFLE_REDUCER_NO_SORTING; + + /** */ + private final String ptyName; + + /** + * + */ + GridHadoopJobProperty() { + ptyName = "gridgain." + name().toLowerCase().replace('_', '.'); + } + + /** + * @return Property name. + */ + public String propertyName() { + return ptyName; + } + + /** + * @param jobInfo Job info. + * @param pty Property. + * @param dflt Default value. + * @return Property value. + */ + public static String get(GridHadoopJobInfo jobInfo, GridHadoopJobProperty pty, @Nullable String dflt) { + String res = jobInfo.property(pty.propertyName()); + + return res == null ? dflt : res; + } + + /** + * @param jobInfo Job info. + * @param pty Property. + * @param dflt Default value. + * @return Property value. + */ + public static int get(GridHadoopJobInfo jobInfo, GridHadoopJobProperty pty, int dflt) { + String res = jobInfo.property(pty.propertyName()); + + return res == null ? dflt : Integer.parseInt(res); + } + + /** + * @param jobInfo Job info. + * @param pty Property. + * @param dflt Default value. + * @return Property value. + */ + public static boolean get(GridHadoopJobInfo jobInfo, GridHadoopJobProperty pty, boolean dflt) { + String res = jobInfo.property(pty.propertyName()); + + return res == null ? dflt : Boolean.parseBoolean(res); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopJobStatus.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopJobStatus.java b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopJobStatus.java new file mode 100644 index 0000000..642dbfb --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopJobStatus.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +import org.gridgain.grid.util.typedef.internal.*; + +import java.io.*; + +/** + * Hadoop job status. + */ +public class GridHadoopJobStatus implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Job ID. */ + private GridHadoopJobId jobId; + + /** Job name. */ + private String jobName; + + /** User. */ + private String usr; + + /** Pending mappers count. */ + private int pendingMapperCnt; + + /** Pending reducers count. */ + private int pendingReducerCnt; + + /** Total mappers count. */ + private int totalMapperCnt; + + /** Total reducers count. */ + private int totalReducerCnt; + /** Phase. */ + private GridHadoopJobPhase jobPhase; + + /** */ + private boolean failed; + + /** Version. */ + private long ver; + + /** + * {@link Externalizable} support. + */ + public GridHadoopJobStatus() { + // No-op. + } + + /** + * Constructor. + * + * @param jobId Job ID. + * @param jobName Job name. + * @param usr User. + * @param pendingMapperCnt Pending mappers count. + * @param pendingReducerCnt Pending reducers count. + * @param totalMapperCnt Total mappers count. + * @param totalReducerCnt Total reducers count. + * @param jobPhase Job phase. + * @param failed Failed. + * @param ver Version. + */ + public GridHadoopJobStatus( + GridHadoopJobId jobId, + String jobName, + String usr, + int pendingMapperCnt, + int pendingReducerCnt, + int totalMapperCnt, + int totalReducerCnt, + GridHadoopJobPhase jobPhase, + boolean failed, + long ver + ) { + this.jobId = jobId; + this.jobName = jobName; + this.usr = usr; + this.pendingMapperCnt = pendingMapperCnt; + this.pendingReducerCnt = pendingReducerCnt; + this.totalMapperCnt = totalMapperCnt; + this.totalReducerCnt = totalReducerCnt; + this.jobPhase = jobPhase; + this.failed = failed; + this.ver = ver; + } + + /** + * @return Job ID. + */ + public GridHadoopJobId jobId() { + return jobId; + } + + /** + * @return Job name. + */ + public String jobName() { + return jobName; + } + + /** + * @return User. + */ + public String user() { + return usr; + } + + /** + * @return Pending mappers count. + */ + public int pendingMapperCnt() { + return pendingMapperCnt; + } + + /** + * @return Pending reducers count. + */ + public int pendingReducerCnt() { + return pendingReducerCnt; + } + + /** + * @return Total mappers count. + */ + public int totalMapperCnt() { + return totalMapperCnt; + } + + /** + * @return Total reducers count. + */ + public int totalReducerCnt() { + return totalReducerCnt; + } + + /** + * @return Version. + */ + public long version() { + return ver; + } + + /** + * @return Job phase. + */ + public GridHadoopJobPhase jobPhase() { + return jobPhase; + } + + /** + * @return {@code true} If the job failed. + */ + public boolean isFailed() { + return failed; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridHadoopJobStatus.class, this); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(jobId); + U.writeString(out, jobName); + U.writeString(out, usr); + out.writeInt(pendingMapperCnt); + out.writeInt(pendingReducerCnt); + out.writeInt(totalMapperCnt); + out.writeInt(totalReducerCnt); + out.writeObject(jobPhase); + out.writeBoolean(failed); + out.writeLong(ver); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + jobId = (GridHadoopJobId)in.readObject(); + jobName = U.readString(in); + usr = U.readString(in); + pendingMapperCnt = in.readInt(); + pendingReducerCnt = in.readInt(); + totalMapperCnt = in.readInt(); + totalReducerCnt = in.readInt(); + jobPhase = (GridHadoopJobPhase)in.readObject(); + failed = in.readBoolean(); + ver = in.readLong(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopMapReducePlan.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopMapReducePlan.java b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopMapReducePlan.java new file mode 100644 index 0000000..f0fb00a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopMapReducePlan.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Map-reduce job execution plan. + */ +public interface GridHadoopMapReducePlan extends Serializable { + /** + * Gets collection of file blocks for which mappers should be executed. + * + * @param nodeId Node ID to check. + * @return Collection of file blocks or {@code null} if no mappers should be executed on given node. + */ + @Nullable public Collection<GridHadoopInputSplit> mappers(UUID nodeId); + + /** + * Gets reducer IDs that should be started on given node. + * + * @param nodeId Node ID to check. + * @return Array of reducer IDs. + */ + @Nullable public int[] reducers(UUID nodeId); + + /** + * Gets collection of all node IDs involved in map part of job execution. + * + * @return Collection of node IDs. + */ + public Collection<UUID> mapperNodeIds(); + + /** + * Gets collection of all node IDs involved in reduce part of job execution. + * + * @return Collection of node IDs. + */ + public Collection<UUID> reducerNodeIds(); + + /** + * Gets overall number of mappers for the job. + * + * @return Number of mappers. + */ + public int mappers(); + + /** + * Gets overall number of reducers for the job. + * + * @return Number of reducers. + */ + public int reducers(); + + /** + * Gets node ID for reducer. + * + * @param reducer Reducer. + * @return Node ID. + */ + public UUID nodeForReducer(int reducer); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopMapReducePlanner.java b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopMapReducePlanner.java new file mode 100644 index 0000000..1b5443c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopMapReducePlanner.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Map-reduce execution planner. + */ +public interface GridHadoopMapReducePlanner { + /** + * Prepares map-reduce execution plan for the given job and topology. + * + * @param job Job. + * @param top Topology. + * @param oldPlan Old plan in case of partial failure. + * @return Map reduce plan. + */ + public GridHadoopMapReducePlan preparePlan(GridHadoopJob job, Collection<ClusterNode> top, + @Nullable GridHadoopMapReducePlan oldPlan) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopPartitioner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopPartitioner.java b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopPartitioner.java new file mode 100644 index 0000000..a3d6bd1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopPartitioner.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +/** + * Partitioner. + */ +public interface GridHadoopPartitioner { + /** + * Gets partition which is actually a reducer index for the given key and value pair. + * + * @param key Key. + * @param val Value. + * @param parts Number of partitions. + * @return Partition. + */ + public int partition(Object key, Object val, int parts); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopSerialization.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopSerialization.java b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopSerialization.java new file mode 100644 index 0000000..4e3d532 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopSerialization.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +import org.apache.ignite.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Hadoop serialization. Not thread safe object, must be created for each thread or correctly synchronized. + */ +public interface GridHadoopSerialization extends AutoCloseable { + /** + * Writes the given object to output. + * + * @param out Output. + * @param obj Object to serialize. + * @throws IgniteCheckedException If failed. + */ + public void write(DataOutput out, Object obj) throws IgniteCheckedException; + + /** + * Reads object from the given input optionally reusing given instance. + * + * @param in Input. + * @param obj Object. + * @return New object or reused instance. + * @throws IgniteCheckedException If failed. + */ + public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException; + + /** + * Finalise the internal objects. + * + * @throws IgniteCheckedException If failed. + */ + @Override public void close() throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopTask.java b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopTask.java new file mode 100644 index 0000000..138a54f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopTask.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +import org.apache.ignite.*; +import org.gridgain.grid.*; + +import java.io.*; + +/** + * Hadoop task. + */ +public abstract class GridHadoopTask { + /** */ + private GridHadoopTaskInfo taskInfo; + + /** + * Creates task. + * + * @param taskInfo Task info. + */ + protected GridHadoopTask(GridHadoopTaskInfo taskInfo) { + assert taskInfo != null; + + this.taskInfo = taskInfo; + } + + /** + * For {@link Externalizable}. + */ + @SuppressWarnings("ConstructorNotProtectedInAbstractClass") + public GridHadoopTask() { + // No-op. + } + + /** + * Gets task info. + * + * @return Task info. + */ + public GridHadoopTaskInfo info() { + return taskInfo; + } + + /** + * Runs task. + * + * @param taskCtx Context. + * @throws GridInterruptedException If interrupted. + * @throws IgniteCheckedException If failed. + */ + public abstract void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException; + + /** + * Interrupts task execution. + */ + public abstract void cancel(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopTaskContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopTaskContext.java new file mode 100644 index 0000000..2bc83bd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopTaskContext.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +import org.apache.ignite.*; + +import java.util.*; + +/** + * Task context. + */ +public abstract class GridHadoopTaskContext { + /** */ + private final GridHadoopJob job; + + /** */ + private GridHadoopTaskInput input; + + /** */ + private GridHadoopTaskOutput output; + + /** */ + private GridHadoopTaskInfo taskInfo; + + /** + * @param taskInfo Task info. + * @param job Job. + */ + protected GridHadoopTaskContext(GridHadoopTaskInfo taskInfo, GridHadoopJob job) { + this.taskInfo = taskInfo; + this.job = job; + } + + /** + * Gets task info. + * + * @return Task info. + */ + public GridHadoopTaskInfo taskInfo() { + return taskInfo; + } + + /** + * Set a new task info. + * + * @param info Task info. + */ + public void taskInfo(GridHadoopTaskInfo info) { + taskInfo = info; + } + + /** + * Gets task output. + * + * @return Task output. + */ + public GridHadoopTaskOutput output() { + return output; + } + + /** + * Gets task input. + * + * @return Task input. + */ + public GridHadoopTaskInput input() { + return input; + } + + /** + * @return Job. + */ + public GridHadoopJob job() { + return job; + } + + /** + * Gets counter for the given name. + * + * @param grp Counter group's name. + * @param name Counter name. + * @return Counter. + */ + public abstract <T extends GridHadoopCounter> T counter(String grp, String name, Class<T> cls); + + /** + * Gets all known counters. + * + * @return Unmodifiable collection of counters. + */ + public abstract GridHadoopCounters counters(); + + /** + * Sets input of the task. + * + * @param in Input. + */ + public void input(GridHadoopTaskInput in) { + input = in; + } + + /** + * Sets output of the task. + * + * @param out Output. + */ + public void output(GridHadoopTaskOutput out) { + output = out; + } + + /** + * Gets partitioner. + * + * @return Partitioner. + * @throws IgniteCheckedException If failed. + */ + public abstract GridHadoopPartitioner partitioner() throws IgniteCheckedException; + + /** + * Gets serializer for values. + * + * @return Serializer for keys. + * @throws IgniteCheckedException If failed. + */ + public abstract GridHadoopSerialization keySerialization() throws IgniteCheckedException; + + /** + * Gets serializer for values. + * + * @return Serializer for values. + * @throws IgniteCheckedException If failed. + */ + public abstract GridHadoopSerialization valueSerialization() throws IgniteCheckedException; + + /** + * Gets sorting comparator. + * + * @return Comparator for sorting. + */ + public abstract Comparator<Object> sortComparator(); + + /** + * Gets comparator for grouping on combine or reduce operation. + * + * @return Comparator. + */ + public abstract Comparator<Object> groupComparator(); + + /** + * Execute current task. + * + * @throws IgniteCheckedException If failed. + */ + public abstract void run() throws IgniteCheckedException; + + /** + * Cancel current task execution. + */ + public abstract void cancel(); + + /** + * Prepare local environment for the task. + * + * @throws IgniteCheckedException If failed. + */ + public abstract void prepareTaskEnvironment() throws IgniteCheckedException; + + /** + * Cleans up local environment of the task. + * + * @throws IgniteCheckedException If failed. + */ + public abstract void cleanupTaskEnvironment() throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopTaskInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopTaskInfo.java b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopTaskInfo.java new file mode 100644 index 0000000..d3d1bf7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopTaskInfo.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Task info. + */ +public class GridHadoopTaskInfo implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private GridHadoopTaskType type; + + /** */ + private GridHadoopJobId jobId; + + /** */ + private int taskNum; + + /** */ + private int attempt; + + /** */ + private GridHadoopInputSplit inputSplit; + + /** + * For {@link Externalizable}. + */ + public GridHadoopTaskInfo() { + // No-op. + } + + /** + * Creates new task info. + * + * @param type Task type. + * @param jobId Job id. + * @param taskNum Task number. + * @param attempt Attempt for this task. + * @param inputSplit Input split. + */ + public GridHadoopTaskInfo(GridHadoopTaskType type, GridHadoopJobId jobId, int taskNum, int attempt, + @Nullable GridHadoopInputSplit inputSplit) { + this.type = type; + this.jobId = jobId; + this.taskNum = taskNum; + this.attempt = attempt; + this.inputSplit = inputSplit; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeByte(type.ordinal()); + out.writeObject(jobId); + out.writeInt(taskNum); + out.writeInt(attempt); + out.writeObject(inputSplit); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + type = GridHadoopTaskType.fromOrdinal(in.readByte()); + jobId = (GridHadoopJobId)in.readObject(); + taskNum = in.readInt(); + attempt = in.readInt(); + inputSplit = (GridHadoopInputSplit)in.readObject(); + } + + /** + * @return Type. + */ + public GridHadoopTaskType type() { + return type; + } + + /** + * @return Job id. + */ + public GridHadoopJobId jobId() { + return jobId; + } + + /** + * @return Task number. + */ + public int taskNumber() { + return taskNum; + } + + /** + * @return Attempt. + */ + public int attempt() { + return attempt; + } + + /** + * @return Input split. + */ + @Nullable public GridHadoopInputSplit inputSplit() { + return inputSplit; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (!(o instanceof GridHadoopTaskInfo)) + return false; + + GridHadoopTaskInfo that = (GridHadoopTaskInfo)o; + + return attempt == that.attempt && taskNum == that.taskNum && jobId.equals(that.jobId) && type == that.type; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = type.hashCode(); + + res = 31 * res + jobId.hashCode(); + res = 31 * res + taskNum; + res = 31 * res + attempt; + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridHadoopTaskInfo.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopTaskInput.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopTaskInput.java b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopTaskInput.java new file mode 100644 index 0000000..cfd7e0c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopTaskInput.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +import org.apache.ignite.*; + +import java.util.*; + +/** + * Task input. + */ +public interface GridHadoopTaskInput extends AutoCloseable { + /** + * Moves cursor to the next element. + * + * @return {@code false} If input is exceeded. + */ + boolean next(); + + /** + * Gets current key. + * + * @return Key. + */ + Object key(); + + /** + * Gets values for current key. + * + * @return Values. + */ + Iterator<?> values(); + + /** + * Closes input. + * + * @throws IgniteCheckedException If failed. + */ + @Override public void close() throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopTaskOutput.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopTaskOutput.java b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopTaskOutput.java new file mode 100644 index 0000000..87f9158 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopTaskOutput.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +import org.apache.ignite.*; + +/** + * Task output. + */ +public interface GridHadoopTaskOutput extends AutoCloseable { + /** + * Writes key and value to the output. + * + * @param key Key. + * @param val Value. + */ + public void write(Object key, Object val) throws IgniteCheckedException; + + /** + * Closes output. + * + * @throws IgniteCheckedException If failed. + */ + @Override public void close() throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopTaskType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopTaskType.java b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopTaskType.java new file mode 100644 index 0000000..ae1f572 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/GridHadoopTaskType.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop; + +import org.jetbrains.annotations.*; + +/** +* Task type. +*/ +public enum GridHadoopTaskType { + /** Setup task. */ + SETUP, + + /** Map task. */ + MAP, + + /** Reduce task. */ + REDUCE, + + /** Combine task. */ + COMBINE, + + /** Commit task. */ + COMMIT, + + /** Abort task. */ + ABORT; + + /** Enumerated values. */ + private static final GridHadoopTaskType[] VALS = values(); + + /** + * Efficiently gets enumerated value from its ordinal. + * + * @param ord Ordinal value. + * @return Enumerated value. + */ + @Nullable public static GridHadoopTaskType fromOrdinal(byte ord) { + return ord >= 0 && ord < VALS.length ? VALS[ord] : null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afd94f76/modules/core/src/main/java/org/apache/ignite/hadoop/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/package.html b/modules/core/src/main/java/org/apache/ignite/hadoop/package.html new file mode 100644 index 0000000..6c751ed --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/hadoop/package.html @@ -0,0 +1,24 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + Contains Hadoop APIs. +</body> +</html>