# IGNITE-386: Moving core classes (7).
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/28fad185 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/28fad185 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/28fad185 Branch: refs/heads/ignite-386 Commit: 28fad1854ce1684bb941bd424430ea25ef870e7a Parents: 06525ca Author: vozerov-gridgain <voze...@gridgain.com> Authored: Tue Mar 3 17:14:08 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Tue Mar 3 17:14:09 2015 +0300 ---------------------------------------------------------------------- config/hadoop/default-config.xml | 2 +- .../configuration/HadoopConfiguration.java | 173 +++++++++++ .../configuration/IgniteConfiguration.java | 7 +- .../ignite/internal/GridKernalContext.java | 2 +- .../ignite/internal/GridKernalContextImpl.java | 8 +- .../ignite/internal/IgniteComponentType.java | 2 +- .../hadoop/GridHadoopConfiguration.java | 172 ----------- .../processors/hadoop/GridHadoopCounters.java | 49 ---- .../processors/hadoop/GridHadoopJobId.java | 103 ------- .../processors/hadoop/GridHadoopJobInfo.java | 83 ------ .../processors/hadoop/GridHadoopJobPhase.java | 38 --- .../hadoop/GridHadoopJobProperty.java | 138 --------- .../processors/hadoop/GridHadoopJobStatus.java | 207 ------------- .../hadoop/GridHadoopMapReducePlan.java | 80 ----- .../hadoop/GridHadoopMapReducePlanner.java | 40 --- .../processors/hadoop/GridHadoopTaskInfo.java | 153 ---------- .../internal/processors/hadoop/Hadoop.java | 16 +- .../processors/hadoop/HadoopCounter.java | 44 --- .../processors/hadoop/HadoopCounterWriter.java | 36 --- .../internal/processors/hadoop/HadoopJob.java | 10 +- .../internal/processors/hadoop/HadoopJobId.java | 103 +++++++ .../processors/hadoop/HadoopJobInfo.java | 83 ++++++ .../processors/hadoop/HadoopJobPhase.java | 38 +++ .../processors/hadoop/HadoopJobProperty.java | 138 +++++++++ .../processors/hadoop/HadoopJobStatus.java | 207 +++++++++++++ .../processors/hadoop/HadoopMapReducePlan.java | 80 +++++ .../hadoop/HadoopMapReducePlanner.java | 40 +++ .../processors/hadoop/HadoopNoopProcessor.java | 76 +++++ .../hadoop/HadoopProcessorAdapter.java | 96 ++++++ .../internal/processors/hadoop/HadoopTask.java | 6 +- .../processors/hadoop/HadoopTaskContext.java | 11 +- .../processors/hadoop/HadoopTaskInfo.java | 153 ++++++++++ .../hadoop/IgniteHadoopNoopProcessor.java | 74 ----- .../hadoop/IgniteHadoopProcessorAdapter.java | 94 ------ .../hadoop/counter/HadoopCounter.java | 44 +++ .../hadoop/counter/HadoopCounterWriter.java | 37 +++ .../hadoop/counter/HadoopCounters.java | 49 ++++ .../fs/IgniteHadoopFileSystemCounterWriter.java | 3 +- .../processors/hadoop/HadoopClassLoader.java | 6 +- .../processors/hadoop/HadoopContext.java | 11 +- .../processors/hadoop/HadoopCounterGroup.java | 121 -------- .../processors/hadoop/HadoopCounters.java | 216 -------------- .../processors/hadoop/HadoopDefaultJobInfo.java | 6 +- .../internal/processors/hadoop/HadoopImpl.java | 16 +- .../hadoop/HadoopMapReduceCounterGroup.java | 121 ++++++++ .../hadoop/HadoopMapReduceCounters.java | 216 ++++++++++++++ .../processors/hadoop/HadoopProcessor.java | 30 +- .../internal/processors/hadoop/HadoopUtils.java | 6 +- .../hadoop/counter/HadoopCounterAdapter.java | 1 - .../hadoop/counter/HadoopCountersImpl.java | 7 +- .../hadoop/counter/HadoopLongCounter.java | 2 - .../counter/HadoopPerformanceCounter.java | 20 +- .../hadoop/igfs/HadoopIgfsJclLogger.java | 5 +- .../hadoop/jobtracker/HadoopJobMetadata.java | 41 +-- .../hadoop/jobtracker/HadoopJobTracker.java | 161 +++++----- .../planner/HadoopDefaultMapReducePlan.java | 2 +- .../planner/HadoopDefaultMapReducePlanner.java | 6 +- .../hadoop/proto/HadoopClientProtocol.java | 17 +- .../proto/HadoopProtocolJobCountersTask.java | 7 +- .../proto/HadoopProtocolJobStatusTask.java | 6 +- .../hadoop/proto/HadoopProtocolKillJobTask.java | 2 +- .../proto/HadoopProtocolNextTaskIdTask.java | 4 +- .../proto/HadoopProtocolSubmitJobTask.java | 12 +- .../hadoop/shuffle/HadoopShuffle.java | 12 +- .../hadoop/shuffle/HadoopShuffleAck.java | 8 +- .../hadoop/shuffle/HadoopShuffleJob.java | 4 +- .../hadoop/shuffle/HadoopShuffleMessage.java | 8 +- .../HadoopConcurrentHashMultimap.java | 2 +- .../shuffle/collections/HadoopHashMultimap.java | 2 +- .../collections/HadoopHashMultimapBase.java | 2 +- .../shuffle/collections/HadoopMultimapBase.java | 4 +- .../shuffle/collections/HadoopSkipList.java | 2 +- .../HadoopEmbeddedTaskExecutor.java | 12 +- .../taskexecutor/HadoopExecutorService.java | 2 +- .../hadoop/taskexecutor/HadoopRunnableTask.java | 10 +- .../taskexecutor/HadoopTaskExecutorAdapter.java | 4 +- .../hadoop/taskexecutor/HadoopTaskStatus.java | 10 +- .../external/HadoopExternalTaskExecutor.java | 54 ++-- .../external/HadoopJobInfoUpdateRequest.java | 14 +- .../external/HadoopPrepareForJobRequest.java | 14 +- .../external/HadoopTaskExecutionRequest.java | 22 +- .../external/HadoopTaskFinishedMessage.java | 8 +- .../child/HadoopChildProcessRunner.java | 10 +- .../communication/HadoopMarshallerFilter.java | 2 +- .../hadoop/v1/HadoopV1CleanupTask.java | 2 +- .../processors/hadoop/v1/HadoopV1MapTask.java | 2 +- .../hadoop/v1/HadoopV1ReduceTask.java | 2 +- .../processors/hadoop/v1/HadoopV1SetupTask.java | 2 +- .../processors/hadoop/v1/HadoopV1Task.java | 2 +- .../hadoop/v2/HadoopV2CleanupTask.java | 2 +- .../processors/hadoop/v2/HadoopV2Context.java | 4 +- .../processors/hadoop/v2/HadoopV2Job.java | 20 +- .../hadoop/v2/HadoopV2JobResourceManager.java | 4 +- .../processors/hadoop/v2/HadoopV2MapTask.java | 4 +- .../hadoop/v2/HadoopV2ReduceTask.java | 2 +- .../processors/hadoop/v2/HadoopV2SetupTask.java | 2 +- .../processors/hadoop/v2/HadoopV2Task.java | 2 +- .../hadoop/v2/HadoopV2TaskContext.java | 7 +- .../HadoopClientProtocolEmbeddedSelfTest.java | 6 +- .../hadoop/GridHadoopPopularWordsTest.java | 294 ------------------- .../processors/hadoop/GridHadoopSharedMap.java | 67 ----- .../processors/hadoop/GridHadoopStartup.java | 54 ---- .../GridHadoopTestRoundRobinMrPlanner.java | 66 ----- .../processors/hadoop/GridHadoopTestUtils.java | 102 ------- .../hadoop/HadoopAbstractSelfTest.java | 4 +- .../hadoop/HadoopCommandLineTest.java | 2 +- .../HadoopDefaultMapReducePlannerSelfTest.java | 18 +- .../processors/hadoop/HadoopGroupingTest.java | 9 +- .../hadoop/HadoopJobTrackerSelfTest.java | 17 +- .../hadoop/HadoopMapReduceEmbeddedSelfTest.java | 17 +- .../processors/hadoop/HadoopMapReduceTest.java | 17 +- .../hadoop/HadoopPopularWordsTest.java | 294 +++++++++++++++++++ .../processors/hadoop/HadoopSharedMap.java | 67 +++++ .../hadoop/HadoopSortingExternalTest.java | 6 +- .../processors/hadoop/HadoopSortingTest.java | 9 +- .../processors/hadoop/HadoopStartup.java | 54 ++++ .../hadoop/HadoopTaskExecutionSelfTest.java | 16 +- .../hadoop/HadoopTasksAllVersionsTest.java | 18 +- .../processors/hadoop/HadoopTasksV1Test.java | 4 +- .../processors/hadoop/HadoopTasksV2Test.java | 8 +- .../hadoop/HadoopTestRoundRobinMrPlanner.java | 66 +++++ .../hadoop/HadoopTestTaskContext.java | 2 +- .../processors/hadoop/HadoopTestUtils.java | 102 +++++++ .../processors/hadoop/HadoopV2JobSelfTest.java | 4 +- .../hadoop/examples/GridHadoopWordCount1.java | 88 ------ .../examples/GridHadoopWordCount1Map.java | 62 ---- .../examples/GridHadoopWordCount1Reduce.java | 51 ---- .../hadoop/examples/GridHadoopWordCount2.java | 95 ------ .../examples/GridHadoopWordCount2Mapper.java | 72 ----- .../examples/GridHadoopWordCount2Reducer.java | 70 ----- .../hadoop/examples/HadoopWordCount1.java | 88 ++++++ .../hadoop/examples/HadoopWordCount1Map.java | 62 ++++ .../hadoop/examples/HadoopWordCount1Reduce.java | 51 ++++ .../hadoop/examples/HadoopWordCount2.java | 95 ++++++ .../hadoop/examples/HadoopWordCount2Mapper.java | 72 +++++ .../examples/HadoopWordCount2Reducer.java | 70 +++++ ...ridHadoopConcurrentHashMultimapSelftest.java | 267 ----------------- .../collections/HadoopAbstractMapTest.java | 8 +- .../HadoopConcurrentHashMultimapSelftest.java | 267 +++++++++++++++++ .../collections/HadoopSkipListSelfTest.java | 4 +- .../HadoopExternalTaskExecutionSelfTest.java | 9 +- .../testsuites/IgniteHadoopTestSuite.java | 2 +- 142 files changed, 3392 insertions(+), 3364 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/config/hadoop/default-config.xml ---------------------------------------------------------------------- diff --git a/config/hadoop/default-config.xml b/config/hadoop/default-config.xml index 8f5854f..f500529 100644 --- a/config/hadoop/default-config.xml +++ b/config/hadoop/default-config.xml @@ -94,7 +94,7 @@ Apache Hadoop Accelerator configuration. --> <property name="hadoopConfiguration"> - <bean class="org.apache.ignite.internal.processors.hadoop.GridHadoopConfiguration"> + <bean class="org.apache.ignite.configuration.HadoopConfiguration"> <!-- Information about finished jobs will be kept for 30 seconds. --> <property name="finishedJobInfoTtl" value="30000"/> </bean> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/configuration/HadoopConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/HadoopConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/HadoopConfiguration.java new file mode 100644 index 0000000..7e6183d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/configuration/HadoopConfiguration.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.configuration; + +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +/** + * Hadoop configuration. + */ +public class HadoopConfiguration { + /** Default finished job info time-to-live. */ + public static final long DFLT_FINISHED_JOB_INFO_TTL = 10_000; + + /** Default value for external execution flag. */ + public static final boolean DFLT_EXTERNAL_EXECUTION = false; + + /** Default value for the max parallel tasks. */ + public static final int DFLT_MAX_PARALLEL_TASKS = Runtime.getRuntime().availableProcessors(); + + /** Default value for the max task queue size. */ + public static final int DFLT_MAX_TASK_QUEUE_SIZE = 1000; + + /** Map reduce planner. */ + private HadoopMapReducePlanner planner; + + /** */ + private boolean extExecution = DFLT_EXTERNAL_EXECUTION; + + /** Finished job info TTL. */ + private long finishedJobInfoTtl = DFLT_FINISHED_JOB_INFO_TTL; + + /** */ + private int maxParallelTasks = DFLT_MAX_PARALLEL_TASKS; + + /** */ + private int maxTaskQueueSize = DFLT_MAX_TASK_QUEUE_SIZE; + + /** + * Default constructor. + */ + public HadoopConfiguration() { + // No-op. + } + + /** + * Copy constructor. + * + * @param cfg Configuration to copy. + */ + public HadoopConfiguration(HadoopConfiguration cfg) { + // Preserve alphabetic order. + extExecution = cfg.isExternalExecution(); + finishedJobInfoTtl = cfg.getFinishedJobInfoTtl(); + planner = cfg.getMapReducePlanner(); + maxParallelTasks = cfg.getMaxParallelTasks(); + maxTaskQueueSize = cfg.getMaxTaskQueueSize(); + } + + /** + * Gets max number of local tasks that may be executed in parallel. + * + * @return Max number of local tasks that may be executed in parallel. + */ + public int getMaxParallelTasks() { + return maxParallelTasks; + } + + /** + * Sets max number of local tasks that may be executed in parallel. + * + * @param maxParallelTasks Max number of local tasks that may be executed in parallel. + */ + public void setMaxParallelTasks(int maxParallelTasks) { + this.maxParallelTasks = maxParallelTasks; + } + + /** + * Gets max task queue size. + * + * @return Max task queue size. + */ + public int getMaxTaskQueueSize() { + return maxTaskQueueSize; + } + + /** + * Sets max task queue size. + * + * @param maxTaskQueueSize Max task queue size. + */ + public void setMaxTaskQueueSize(int maxTaskQueueSize) { + this.maxTaskQueueSize = maxTaskQueueSize; + } + + /** + * Gets finished job info time-to-live in milliseconds. + * + * @return Finished job info time-to-live. + */ + public long getFinishedJobInfoTtl() { + return finishedJobInfoTtl; + } + + /** + * Sets finished job info time-to-live. + * + * @param finishedJobInfoTtl Finished job info time-to-live. + */ + public void setFinishedJobInfoTtl(long finishedJobInfoTtl) { + this.finishedJobInfoTtl = finishedJobInfoTtl; + } + + /** + * Gets external task execution flag. If {@code true}, hadoop job tasks will be executed in an external + * (relative to node) process. + * + * @return {@code True} if external execution. + */ + public boolean isExternalExecution() { + return extExecution; + } + + /** + * Sets external task execution flag. + * + * @param extExecution {@code True} if tasks should be executed in an external process. + * @see #isExternalExecution() + */ + public void setExternalExecution(boolean extExecution) { + this.extExecution = extExecution; + } + + /** + * Gets Hadoop map-reduce planner, a component which defines job execution plan based on job + * configuration and current grid topology. + * + * @return Map-reduce planner. + */ + public HadoopMapReducePlanner getMapReducePlanner() { + return planner; + } + + /** + * Sets Hadoop map-reduce planner, a component which defines job execution plan based on job + * configuration and current grid topology. + * + * @param planner Map-reduce planner. + */ + public void setMapReducePlanner(HadoopMapReducePlanner planner) { + this.planner = planner; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopConfiguration.class, this, super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index cf88778..1036e0e 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -20,7 +20,6 @@ package org.apache.ignite.configuration; import org.apache.ignite.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.managers.eventstorage.*; -import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.lifecycle.*; @@ -354,7 +353,7 @@ public class IgniteConfiguration { private ServiceConfiguration[] svcCfgs; /** Hadoop configuration. */ - private GridHadoopConfiguration hadoopCfg; + private HadoopConfiguration hadoopCfg; /** Client access configuration. */ private ConnectorConfiguration connectorCfg = new ConnectorConfiguration(); @@ -1768,7 +1767,7 @@ public class IgniteConfiguration { * * @return Hadoop configuration. */ - public GridHadoopConfiguration getHadoopConfiguration() { + public HadoopConfiguration getHadoopConfiguration() { return hadoopCfg; } @@ -1777,7 +1776,7 @@ public class IgniteConfiguration { * * @param hadoopCfg Hadoop configuration. */ - public void setHadoopConfiguration(GridHadoopConfiguration hadoopCfg) { + public void setHadoopConfiguration(HadoopConfiguration hadoopCfg) { this.hadoopCfg = hadoopCfg; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index cb9ffa1..30ba883 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -282,7 +282,7 @@ public interface GridKernalContext extends Iterable<GridComponent> { * * @return Hadoop processor. */ - public IgniteHadoopProcessorAdapter hadoop(); + public HadoopProcessorAdapter hadoop(); /** * Gets utility cache pool. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 756c16a..e80df0b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -230,7 +230,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringExclude - private IgniteHadoopProcessorAdapter hadoopProc; + private HadoopProcessorAdapter hadoopProc; /** */ @GridToStringExclude @@ -456,8 +456,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable streamProc = (GridStreamProcessor)comp; else if (comp instanceof GridContinuousProcessor) contProc = (GridContinuousProcessor)comp; - else if (comp instanceof IgniteHadoopProcessorAdapter) - hadoopProc = (IgniteHadoopProcessorAdapter)comp; + else if (comp instanceof HadoopProcessorAdapter) + hadoopProc = (HadoopProcessorAdapter)comp; else if (comp instanceof GridPortableProcessor) portableProc = (GridPortableProcessor)comp; else if (comp instanceof IgnitePluginProcessor) @@ -680,7 +680,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ - @Override public IgniteHadoopProcessorAdapter hadoop() { + @Override public HadoopProcessorAdapter hadoop() { return hadoopProc; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java index a51800e..0e5c1cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java @@ -35,7 +35,7 @@ public enum IgniteComponentType { /** Hadoop. */ HADOOP( - "org.apache.ignite.internal.processors.hadoop.IgniteHadoopNoopProcessor", + "org.apache.ignite.internal.processors.hadoop.HadoopNoopProcessor", "org.apache.ignite.internal.processors.hadoop.HadoopProcessor", "ignite-hadoop" ), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopConfiguration.java deleted file mode 100644 index f66b95a..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopConfiguration.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.internal.util.typedef.internal.*; - -/** - * Hadoop configuration. - */ -public class GridHadoopConfiguration { - /** Default finished job info time-to-live. */ - public static final long DFLT_FINISHED_JOB_INFO_TTL = 10_000; - - /** Default value for external execution flag. */ - public static final boolean DFLT_EXTERNAL_EXECUTION = false; - - /** Default value for the max parallel tasks. */ - public static final int DFLT_MAX_PARALLEL_TASKS = Runtime.getRuntime().availableProcessors(); - - /** Default value for the max task queue size. */ - public static final int DFLT_MAX_TASK_QUEUE_SIZE = 1000; - - /** Map reduce planner. */ - private GridHadoopMapReducePlanner planner; - - /** */ - private boolean extExecution = DFLT_EXTERNAL_EXECUTION; - - /** Finished job info TTL. */ - private long finishedJobInfoTtl = DFLT_FINISHED_JOB_INFO_TTL; - - /** */ - private int maxParallelTasks = DFLT_MAX_PARALLEL_TASKS; - - /** */ - private int maxTaskQueueSize = DFLT_MAX_TASK_QUEUE_SIZE; - - /** - * Default constructor. - */ - public GridHadoopConfiguration() { - // No-op. - } - - /** - * Copy constructor. - * - * @param cfg Configuration to copy. - */ - public GridHadoopConfiguration(GridHadoopConfiguration cfg) { - // Preserve alphabetic order. - extExecution = cfg.isExternalExecution(); - finishedJobInfoTtl = cfg.getFinishedJobInfoTtl(); - planner = cfg.getMapReducePlanner(); - maxParallelTasks = cfg.getMaxParallelTasks(); - maxTaskQueueSize = cfg.getMaxTaskQueueSize(); - } - - /** - * Gets max number of local tasks that may be executed in parallel. - * - * @return Max number of local tasks that may be executed in parallel. - */ - public int getMaxParallelTasks() { - return maxParallelTasks; - } - - /** - * Sets max number of local tasks that may be executed in parallel. - * - * @param maxParallelTasks Max number of local tasks that may be executed in parallel. - */ - public void setMaxParallelTasks(int maxParallelTasks) { - this.maxParallelTasks = maxParallelTasks; - } - - /** - * Gets max task queue size. - * - * @return Max task queue size. - */ - public int getMaxTaskQueueSize() { - return maxTaskQueueSize; - } - - /** - * Sets max task queue size. - * - * @param maxTaskQueueSize Max task queue size. - */ - public void setMaxTaskQueueSize(int maxTaskQueueSize) { - this.maxTaskQueueSize = maxTaskQueueSize; - } - - /** - * Gets finished job info time-to-live in milliseconds. - * - * @return Finished job info time-to-live. - */ - public long getFinishedJobInfoTtl() { - return finishedJobInfoTtl; - } - - /** - * Sets finished job info time-to-live. - * - * @param finishedJobInfoTtl Finished job info time-to-live. - */ - public void setFinishedJobInfoTtl(long finishedJobInfoTtl) { - this.finishedJobInfoTtl = finishedJobInfoTtl; - } - - /** - * Gets external task execution flag. If {@code true}, hadoop job tasks will be executed in an external - * (relative to node) process. - * - * @return {@code True} if external execution. - */ - public boolean isExternalExecution() { - return extExecution; - } - - /** - * Sets external task execution flag. - * - * @param extExecution {@code True} if tasks should be executed in an external process. - * @see #isExternalExecution() - */ - public void setExternalExecution(boolean extExecution) { - this.extExecution = extExecution; - } - - /** - * Gets Hadoop map-reduce planner, a component which defines job execution plan based on job - * configuration and current grid topology. - * - * @return Map-reduce planner. - */ - public GridHadoopMapReducePlanner getMapReducePlanner() { - return planner; - } - - /** - * Sets Hadoop map-reduce planner, a component which defines job execution plan based on job - * configuration and current grid topology. - * - * @param planner Map-reduce planner. - */ - public void setMapReducePlanner(GridHadoopMapReducePlanner planner) { - this.planner = planner; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridHadoopConfiguration.class, this, super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounters.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounters.java deleted file mode 100644 index 3d577b4..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCounters.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import java.util.*; - -/** - * Counters store. - */ -public interface GridHadoopCounters { - /** - * Returns counter for the specified group and counter name. Creates new if it does not exist. - * - * @param grp Counter group name. - * @param name Counter name. - * @param cls Class for new instance creation if it's needed. - * @return The counter that was found or added or {@code null} if create is false. - */ - <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls); - - /** - * Returns all existing counters. - * - * @return Collection of counters. - */ - Collection<HadoopCounter> all(); - - /** - * Merges all counters from another store with existing counters. - * - * @param other Counters to merge with. - */ - void merge(GridHadoopCounters other); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobId.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobId.java deleted file mode 100644 index ffc2057..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobId.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.*; - -/** - * Job ID. - */ -public class GridHadoopJobId implements GridCacheInternal, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private UUID nodeId; - - /** */ - private int jobId; - - /** - * For {@link Externalizable}. - */ - public GridHadoopJobId() { - // No-op. - } - - /** - * @param nodeId Node ID. - * @param jobId Job ID. - */ - public GridHadoopJobId(UUID nodeId, int jobId) { - this.nodeId = nodeId; - this.jobId = jobId; - } - - public UUID globalId() { - return nodeId; - } - - public int localId() { - return jobId; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeUuid(out, nodeId); - out.writeInt(jobId); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - nodeId = U.readUuid(in); - jobId = in.readInt(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - GridHadoopJobId that = (GridHadoopJobId) o; - - if (jobId != that.jobId) - return false; - - if (!nodeId.equals(that.nodeId)) - return false; - - return true; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return 31 * nodeId.hashCode() + jobId; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return nodeId + "_" + jobId; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobInfo.java deleted file mode 100644 index 6c75e5d..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobInfo.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * Compact job description. - */ -public interface GridHadoopJobInfo extends Serializable { - /** - * Gets optional configuration property for the job. - * - * @param name Property name. - * @return Value or {@code null} if none. - */ - @Nullable public String property(String name); - - /** - * Checks whether job has combiner. - * - * @return {@code true} If job has combiner. - */ - public boolean hasCombiner(); - - /** - * Checks whether job has reducer. - * Actual number of reducers will be in {@link GridHadoopMapReducePlan#reducers()}. - * - * @return Number of reducer. - */ - public boolean hasReducer(); - - /** - * Creates new job instance for the given ID. - * {@link GridHadoopJobInfo} is reusable for multiple jobs while {@link HadoopJob} is for one job execution. - * This method will be called once for the same ID on one node, though it can be called on the same host - * multiple times from different processes (in case of multiple nodes on the same host or external execution). - * - * @param jobId Job ID. - * @param log Logger. - * @return Job. - * @throws IgniteCheckedException If failed. - */ - HadoopJob createJob(GridHadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException; - - /** - * @return Number of reducers configured for job. - */ - public int reducers(); - - /** - * Gets job name. - * - * @return Job name. - */ - public String jobName(); - - /** - * Gets user name. - * - * @return User name. - */ - public String user(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobPhase.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobPhase.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobPhase.java deleted file mode 100644 index cc122bb..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobPhase.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -/** - * Job run phase. - */ -public enum GridHadoopJobPhase { - /** Job is running setup task. */ - PHASE_SETUP, - - /** Job is running map and combine tasks. */ - PHASE_MAP, - - /** Job has finished all map tasks and running reduce tasks. */ - PHASE_REDUCE, - - /** Job is stopping due to exception during any of the phases. */ - PHASE_CANCELLING, - - /** Job has finished execution. */ - PHASE_COMPLETE -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobProperty.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobProperty.java deleted file mode 100644 index 0ece051..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobProperty.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.jetbrains.annotations.*; - -/** - * Enumeration of optional properties supported by Ignite for Apache Hadoop. - */ -public enum GridHadoopJobProperty { - /** - * Initial size for hashmap which stores output of mapper and will be used as input of combiner. - * <p> - * Setting it right allows to avoid rehashing. - */ - COMBINER_HASHMAP_SIZE, - - /** - * Initial size for hashmap which stores output of mapper or combiner and will be used as input of reducer. - * <p> - * Setting it right allows to avoid rehashing. - */ - PARTITION_HASHMAP_SIZE, - - /** - * Specifies number of concurrently running mappers for external execution mode. - * <p> - * If not specified, defaults to {@code Runtime.getRuntime().availableProcessors()}. - */ - EXTERNAL_CONCURRENT_MAPPERS, - - /** - * Specifies number of concurrently running reducers for external execution mode. - * <p> - * If not specified, defaults to {@code Runtime.getRuntime().availableProcessors()}. - */ - EXTERNAL_CONCURRENT_REDUCERS, - - /** - * Delay in milliseconds after which Ignite server will reply job status. - */ - JOB_STATUS_POLL_DELAY, - - /** - * Size in bytes of single memory page which will be allocated for data structures in shuffle. - * <p> - * By default is {@code 32 * 1024}. - */ - SHUFFLE_OFFHEAP_PAGE_SIZE, - - /** - * If set to {@code true} then input for combiner will not be sorted by key. - * Internally hash-map will be used instead of sorted one, so {@link Object#equals(Object)} - * and {@link Object#hashCode()} methods of key must be implemented consistently with - * comparator for that type. Grouping comparator is not supported if this setting is {@code true}. - * <p> - * By default is {@code false}. - */ - SHUFFLE_COMBINER_NO_SORTING, - - /** - * If set to {@code true} then input for reducer will not be sorted by key. - * Internally hash-map will be used instead of sorted one, so {@link Object#equals(Object)} - * and {@link Object#hashCode()} methods of key must be implemented consistently with - * comparator for that type. Grouping comparator is not supported if this setting is {@code true}. - * <p> - * By default is {@code false}. - */ - SHUFFLE_REDUCER_NO_SORTING; - - /** */ - private final String ptyName; - - /** - * - */ - GridHadoopJobProperty() { - ptyName = "ignite." + name().toLowerCase().replace('_', '.'); - } - - /** - * @return Property name. - */ - public String propertyName() { - return ptyName; - } - - /** - * @param jobInfo Job info. - * @param pty Property. - * @param dflt Default value. - * @return Property value. - */ - public static String get(GridHadoopJobInfo jobInfo, GridHadoopJobProperty pty, @Nullable String dflt) { - String res = jobInfo.property(pty.propertyName()); - - return res == null ? dflt : res; - } - - /** - * @param jobInfo Job info. - * @param pty Property. - * @param dflt Default value. - * @return Property value. - */ - public static int get(GridHadoopJobInfo jobInfo, GridHadoopJobProperty pty, int dflt) { - String res = jobInfo.property(pty.propertyName()); - - return res == null ? dflt : Integer.parseInt(res); - } - - /** - * @param jobInfo Job info. - * @param pty Property. - * @param dflt Default value. - * @return Property value. - */ - public static boolean get(GridHadoopJobInfo jobInfo, GridHadoopJobProperty pty, boolean dflt) { - String res = jobInfo.property(pty.propertyName()); - - return res == null ? dflt : Boolean.parseBoolean(res); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobStatus.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobStatus.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobStatus.java deleted file mode 100644 index 02ea883..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobStatus.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; - -/** - * Hadoop job status. - */ -public class GridHadoopJobStatus implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Job ID. */ - private GridHadoopJobId jobId; - - /** Job name. */ - private String jobName; - - /** User. */ - private String usr; - - /** Pending mappers count. */ - private int pendingMapperCnt; - - /** Pending reducers count. */ - private int pendingReducerCnt; - - /** Total mappers count. */ - private int totalMapperCnt; - - /** Total reducers count. */ - private int totalReducerCnt; - /** Phase. */ - private GridHadoopJobPhase jobPhase; - - /** */ - private boolean failed; - - /** Version. */ - private long ver; - - /** - * {@link Externalizable} support. - */ - public GridHadoopJobStatus() { - // No-op. - } - - /** - * Constructor. - * - * @param jobId Job ID. - * @param jobName Job name. - * @param usr User. - * @param pendingMapperCnt Pending mappers count. - * @param pendingReducerCnt Pending reducers count. - * @param totalMapperCnt Total mappers count. - * @param totalReducerCnt Total reducers count. - * @param jobPhase Job phase. - * @param failed Failed. - * @param ver Version. - */ - public GridHadoopJobStatus( - GridHadoopJobId jobId, - String jobName, - String usr, - int pendingMapperCnt, - int pendingReducerCnt, - int totalMapperCnt, - int totalReducerCnt, - GridHadoopJobPhase jobPhase, - boolean failed, - long ver - ) { - this.jobId = jobId; - this.jobName = jobName; - this.usr = usr; - this.pendingMapperCnt = pendingMapperCnt; - this.pendingReducerCnt = pendingReducerCnt; - this.totalMapperCnt = totalMapperCnt; - this.totalReducerCnt = totalReducerCnt; - this.jobPhase = jobPhase; - this.failed = failed; - this.ver = ver; - } - - /** - * @return Job ID. - */ - public GridHadoopJobId jobId() { - return jobId; - } - - /** - * @return Job name. - */ - public String jobName() { - return jobName; - } - - /** - * @return User. - */ - public String user() { - return usr; - } - - /** - * @return Pending mappers count. - */ - public int pendingMapperCnt() { - return pendingMapperCnt; - } - - /** - * @return Pending reducers count. - */ - public int pendingReducerCnt() { - return pendingReducerCnt; - } - - /** - * @return Total mappers count. - */ - public int totalMapperCnt() { - return totalMapperCnt; - } - - /** - * @return Total reducers count. - */ - public int totalReducerCnt() { - return totalReducerCnt; - } - - /** - * @return Version. - */ - public long version() { - return ver; - } - - /** - * @return Job phase. - */ - public GridHadoopJobPhase jobPhase() { - return jobPhase; - } - - /** - * @return {@code true} If the job failed. - */ - public boolean isFailed() { - return failed; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridHadoopJobStatus.class, this); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(jobId); - U.writeString(out, jobName); - U.writeString(out, usr); - out.writeInt(pendingMapperCnt); - out.writeInt(pendingReducerCnt); - out.writeInt(totalMapperCnt); - out.writeInt(totalReducerCnt); - out.writeObject(jobPhase); - out.writeBoolean(failed); - out.writeLong(ver); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - jobId = (GridHadoopJobId)in.readObject(); - jobName = U.readString(in); - usr = U.readString(in); - pendingMapperCnt = in.readInt(); - pendingReducerCnt = in.readInt(); - totalMapperCnt = in.readInt(); - totalReducerCnt = in.readInt(); - jobPhase = (GridHadoopJobPhase)in.readObject(); - failed = in.readBoolean(); - ver = in.readLong(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlan.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlan.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlan.java deleted file mode 100644 index bb638fc..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlan.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * Map-reduce job execution plan. - */ -public interface GridHadoopMapReducePlan extends Serializable { - /** - * Gets collection of file blocks for which mappers should be executed. - * - * @param nodeId Node ID to check. - * @return Collection of file blocks or {@code null} if no mappers should be executed on given node. - */ - @Nullable public Collection<HadoopInputSplit> mappers(UUID nodeId); - - /** - * Gets reducer IDs that should be started on given node. - * - * @param nodeId Node ID to check. - * @return Array of reducer IDs. - */ - @Nullable public int[] reducers(UUID nodeId); - - /** - * Gets collection of all node IDs involved in map part of job execution. - * - * @return Collection of node IDs. - */ - public Collection<UUID> mapperNodeIds(); - - /** - * Gets collection of all node IDs involved in reduce part of job execution. - * - * @return Collection of node IDs. - */ - public Collection<UUID> reducerNodeIds(); - - /** - * Gets overall number of mappers for the job. - * - * @return Number of mappers. - */ - public int mappers(); - - /** - * Gets overall number of reducers for the job. - * - * @return Number of reducers. - */ - public int reducers(); - - /** - * Gets node ID for reducer. - * - * @param reducer Reducer. - * @return Node ID. - */ - public UUID nodeForReducer(int reducer); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlanner.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlanner.java deleted file mode 100644 index 0119eec..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlanner.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Map-reduce execution planner. - */ -public interface GridHadoopMapReducePlanner { - /** - * Prepares map-reduce execution plan for the given job and topology. - * - * @param job Job. - * @param top Topology. - * @param oldPlan Old plan in case of partial failure. - * @return Map reduce plan. - */ - public GridHadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top, - @Nullable GridHadoopMapReducePlan oldPlan) throws IgniteCheckedException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInfo.java deleted file mode 100644 index 7107f17..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInfo.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * Task info. - */ -public class GridHadoopTaskInfo implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private HadoopTaskType type; - - /** */ - private GridHadoopJobId jobId; - - /** */ - private int taskNum; - - /** */ - private int attempt; - - /** */ - private HadoopInputSplit inputSplit; - - /** - * For {@link Externalizable}. - */ - public GridHadoopTaskInfo() { - // No-op. - } - - /** - * Creates new task info. - * - * @param type Task type. - * @param jobId Job id. - * @param taskNum Task number. - * @param attempt Attempt for this task. - * @param inputSplit Input split. - */ - public GridHadoopTaskInfo(HadoopTaskType type, GridHadoopJobId jobId, int taskNum, int attempt, - @Nullable HadoopInputSplit inputSplit) { - this.type = type; - this.jobId = jobId; - this.taskNum = taskNum; - this.attempt = attempt; - this.inputSplit = inputSplit; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeByte(type.ordinal()); - out.writeObject(jobId); - out.writeInt(taskNum); - out.writeInt(attempt); - out.writeObject(inputSplit); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - type = HadoopTaskType.fromOrdinal(in.readByte()); - jobId = (GridHadoopJobId)in.readObject(); - taskNum = in.readInt(); - attempt = in.readInt(); - inputSplit = (HadoopInputSplit)in.readObject(); - } - - /** - * @return Type. - */ - public HadoopTaskType type() { - return type; - } - - /** - * @return Job id. - */ - public GridHadoopJobId jobId() { - return jobId; - } - - /** - * @return Task number. - */ - public int taskNumber() { - return taskNum; - } - - /** - * @return Attempt. - */ - public int attempt() { - return attempt; - } - - /** - * @return Input split. - */ - @Nullable public HadoopInputSplit inputSplit() { - return inputSplit; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (!(o instanceof GridHadoopTaskInfo)) - return false; - - GridHadoopTaskInfo that = (GridHadoopTaskInfo)o; - - return attempt == that.attempt && taskNum == that.taskNum && jobId.equals(that.jobId) && type == that.type; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = type.hashCode(); - - res = 31 * res + jobId.hashCode(); - res = 31 * res + taskNum; - res = 31 * res + attempt; - - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridHadoopTaskInfo.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/Hadoop.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/Hadoop.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/Hadoop.java index 1df1378..9efc4a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/Hadoop.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/Hadoop.java @@ -18,7 +18,9 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.ignite.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; import org.jetbrains.annotations.*; /** @@ -30,14 +32,14 @@ public interface Hadoop { * * @return Hadoop module configuration. */ - public GridHadoopConfiguration configuration(); + public HadoopConfiguration configuration(); /** * Generate next job ID. * * @return Next job ID. */ - public GridHadoopJobId nextJobId(); + public HadoopJobId nextJobId(); /** * Submits job to job tracker. @@ -46,7 +48,7 @@ public interface Hadoop { * @param jobInfo Job info to submit. * @return Execution future. */ - public IgniteInternalFuture<?> submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo); + public IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo); /** * Gets Hadoop job execution status. @@ -55,7 +57,7 @@ public interface Hadoop { * @return Job execution status or {@code null} in case job with the given ID is not found. * @throws IgniteCheckedException If failed. */ - @Nullable public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException; + @Nullable public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException; /** * Returns job counters. @@ -64,7 +66,7 @@ public interface Hadoop { * @return Job counters object. * @throws IgniteCheckedException If failed. */ - public GridHadoopCounters counters(GridHadoopJobId jobId) throws IgniteCheckedException; + public HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException; /** * Gets Hadoop finish future for particular job. @@ -73,7 +75,7 @@ public interface Hadoop { * @return Job finish future or {@code null} in case job with the given ID is not found. * @throws IgniteCheckedException If failed. */ - @Nullable public IgniteInternalFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException; + @Nullable public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException; /** * Kills job. @@ -82,5 +84,5 @@ public interface Hadoop { * @return {@code True} if job was killed. * @throws IgniteCheckedException If failed. */ - public boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException; + public boolean kill(HadoopJobId jobId) throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounter.java deleted file mode 100644 index 581144a..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounter.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -/** - * Hadoop counter. - */ -public interface HadoopCounter { - /** - * Gets name. - * - * @return Name of the counter. - */ - public String name(); - - /** - * Gets counter group. - * - * @return Counter group's name. - */ - public String group(); - - /** - * Merge the given counter to this counter. - * - * @param cntr Counter to merge into this counter. - */ - public void merge(HadoopCounter cntr); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterWriter.java deleted file mode 100644 index 0d33fd2..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterWriter.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; - -/** - * The object that writes some system counters to some storage for each running job. This operation is a part of - * whole statistics collection process. - */ -public interface HadoopCounterWriter { - /** - * Writes counters of given job to some statistics storage. - * - * @param jobInfo Job info. - * @param jobId Job id. - * @param cntrs Counters. - * @throws IgniteCheckedException If failed. - */ - public void write(GridHadoopJobInfo jobInfo, GridHadoopJobId jobId, GridHadoopCounters cntrs) throws IgniteCheckedException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java index facb0ce..65cb48d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java @@ -30,14 +30,14 @@ public interface HadoopJob { * * @return Job ID. */ - public GridHadoopJobId id(); + public HadoopJobId id(); /** * Gets job information. * * @return Job information. */ - public GridHadoopJobInfo info(); + public HadoopJobInfo info(); /** * Gets collection of input splits for this job. @@ -53,7 +53,7 @@ public interface HadoopJob { * @return Task Context. * @throws IgniteCheckedException If failed. */ - public HadoopTaskContext getTaskContext(GridHadoopTaskInfo info) throws IgniteCheckedException; + public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException; /** * Does all the needed initialization for the job. Will be called on each node where tasks for this job must @@ -85,7 +85,7 @@ public interface HadoopJob { * @param info Task info. * @throws IgniteCheckedException If failed. */ - public void prepareTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException; + public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException; /** * Cleans up local environment of the task. @@ -93,7 +93,7 @@ public interface HadoopJob { * @param info Task info. * @throws IgniteCheckedException If failed. */ - public void cleanupTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException; + public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException; /** * Cleans up the job staging directory. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobId.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobId.java new file mode 100644 index 0000000..b0593a8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobId.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * Job ID. + */ +public class HadoopJobId implements GridCacheInternal, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private UUID nodeId; + + /** */ + private int jobId; + + /** + * For {@link Externalizable}. + */ + public HadoopJobId() { + // No-op. + } + + /** + * @param nodeId Node ID. + * @param jobId Job ID. + */ + public HadoopJobId(UUID nodeId, int jobId) { + this.nodeId = nodeId; + this.jobId = jobId; + } + + public UUID globalId() { + return nodeId; + } + + public int localId() { + return jobId; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeUuid(out, nodeId); + out.writeInt(jobId); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + nodeId = U.readUuid(in); + jobId = in.readInt(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + HadoopJobId that = (HadoopJobId) o; + + if (jobId != that.jobId) + return false; + + if (!nodeId.equals(that.nodeId)) + return false; + + return true; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return 31 * nodeId.hashCode() + jobId; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return nodeId + "_" + jobId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java new file mode 100644 index 0000000..51faf5d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Compact job description. + */ +public interface HadoopJobInfo extends Serializable { + /** + * Gets optional configuration property for the job. + * + * @param name Property name. + * @return Value or {@code null} if none. + */ + @Nullable public String property(String name); + + /** + * Checks whether job has combiner. + * + * @return {@code true} If job has combiner. + */ + public boolean hasCombiner(); + + /** + * Checks whether job has reducer. + * Actual number of reducers will be in {@link HadoopMapReducePlan#reducers()}. + * + * @return Number of reducer. + */ + public boolean hasReducer(); + + /** + * Creates new job instance for the given ID. + * {@link HadoopJobInfo} is reusable for multiple jobs while {@link HadoopJob} is for one job execution. + * This method will be called once for the same ID on one node, though it can be called on the same host + * multiple times from different processes (in case of multiple nodes on the same host or external execution). + * + * @param jobId Job ID. + * @param log Logger. + * @return Job. + * @throws IgniteCheckedException If failed. + */ + HadoopJob createJob(HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException; + + /** + * @return Number of reducers configured for job. + */ + public int reducers(); + + /** + * Gets job name. + * + * @return Job name. + */ + public String jobName(); + + /** + * Gets user name. + * + * @return User name. + */ + public String user(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobPhase.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobPhase.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobPhase.java new file mode 100644 index 0000000..8c932bb --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobPhase.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +/** + * Job run phase. + */ +public enum HadoopJobPhase { + /** Job is running setup task. */ + PHASE_SETUP, + + /** Job is running map and combine tasks. */ + PHASE_MAP, + + /** Job has finished all map tasks and running reduce tasks. */ + PHASE_REDUCE, + + /** Job is stopping due to exception during any of the phases. */ + PHASE_CANCELLING, + + /** Job has finished execution. */ + PHASE_COMPLETE +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java new file mode 100644 index 0000000..1a58624 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.jetbrains.annotations.*; + +/** + * Enumeration of optional properties supported by Ignite for Apache Hadoop. + */ +public enum HadoopJobProperty { + /** + * Initial size for hashmap which stores output of mapper and will be used as input of combiner. + * <p> + * Setting it right allows to avoid rehashing. + */ + COMBINER_HASHMAP_SIZE, + + /** + * Initial size for hashmap which stores output of mapper or combiner and will be used as input of reducer. + * <p> + * Setting it right allows to avoid rehashing. + */ + PARTITION_HASHMAP_SIZE, + + /** + * Specifies number of concurrently running mappers for external execution mode. + * <p> + * If not specified, defaults to {@code Runtime.getRuntime().availableProcessors()}. + */ + EXTERNAL_CONCURRENT_MAPPERS, + + /** + * Specifies number of concurrently running reducers for external execution mode. + * <p> + * If not specified, defaults to {@code Runtime.getRuntime().availableProcessors()}. + */ + EXTERNAL_CONCURRENT_REDUCERS, + + /** + * Delay in milliseconds after which Ignite server will reply job status. + */ + JOB_STATUS_POLL_DELAY, + + /** + * Size in bytes of single memory page which will be allocated for data structures in shuffle. + * <p> + * By default is {@code 32 * 1024}. + */ + SHUFFLE_OFFHEAP_PAGE_SIZE, + + /** + * If set to {@code true} then input for combiner will not be sorted by key. + * Internally hash-map will be used instead of sorted one, so {@link Object#equals(Object)} + * and {@link Object#hashCode()} methods of key must be implemented consistently with + * comparator for that type. Grouping comparator is not supported if this setting is {@code true}. + * <p> + * By default is {@code false}. + */ + SHUFFLE_COMBINER_NO_SORTING, + + /** + * If set to {@code true} then input for reducer will not be sorted by key. + * Internally hash-map will be used instead of sorted one, so {@link Object#equals(Object)} + * and {@link Object#hashCode()} methods of key must be implemented consistently with + * comparator for that type. Grouping comparator is not supported if this setting is {@code true}. + * <p> + * By default is {@code false}. + */ + SHUFFLE_REDUCER_NO_SORTING; + + /** */ + private final String ptyName; + + /** + * + */ + HadoopJobProperty() { + ptyName = "ignite." + name().toLowerCase().replace('_', '.'); + } + + /** + * @return Property name. + */ + public String propertyName() { + return ptyName; + } + + /** + * @param jobInfo Job info. + * @param pty Property. + * @param dflt Default value. + * @return Property value. + */ + public static String get(HadoopJobInfo jobInfo, HadoopJobProperty pty, @Nullable String dflt) { + String res = jobInfo.property(pty.propertyName()); + + return res == null ? dflt : res; + } + + /** + * @param jobInfo Job info. + * @param pty Property. + * @param dflt Default value. + * @return Property value. + */ + public static int get(HadoopJobInfo jobInfo, HadoopJobProperty pty, int dflt) { + String res = jobInfo.property(pty.propertyName()); + + return res == null ? dflt : Integer.parseInt(res); + } + + /** + * @param jobInfo Job info. + * @param pty Property. + * @param dflt Default value. + * @return Property value. + */ + public static boolean get(HadoopJobInfo jobInfo, HadoopJobProperty pty, boolean dflt) { + String res = jobInfo.property(pty.propertyName()); + + return res == null ? dflt : Boolean.parseBoolean(res); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobStatus.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobStatus.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobStatus.java new file mode 100644 index 0000000..752556d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobStatus.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Hadoop job status. + */ +public class HadoopJobStatus implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Job ID. */ + private HadoopJobId jobId; + + /** Job name. */ + private String jobName; + + /** User. */ + private String usr; + + /** Pending mappers count. */ + private int pendingMapperCnt; + + /** Pending reducers count. */ + private int pendingReducerCnt; + + /** Total mappers count. */ + private int totalMapperCnt; + + /** Total reducers count. */ + private int totalReducerCnt; + /** Phase. */ + private HadoopJobPhase jobPhase; + + /** */ + private boolean failed; + + /** Version. */ + private long ver; + + /** + * {@link Externalizable} support. + */ + public HadoopJobStatus() { + // No-op. + } + + /** + * Constructor. + * + * @param jobId Job ID. + * @param jobName Job name. + * @param usr User. + * @param pendingMapperCnt Pending mappers count. + * @param pendingReducerCnt Pending reducers count. + * @param totalMapperCnt Total mappers count. + * @param totalReducerCnt Total reducers count. + * @param jobPhase Job phase. + * @param failed Failed. + * @param ver Version. + */ + public HadoopJobStatus( + HadoopJobId jobId, + String jobName, + String usr, + int pendingMapperCnt, + int pendingReducerCnt, + int totalMapperCnt, + int totalReducerCnt, + HadoopJobPhase jobPhase, + boolean failed, + long ver + ) { + this.jobId = jobId; + this.jobName = jobName; + this.usr = usr; + this.pendingMapperCnt = pendingMapperCnt; + this.pendingReducerCnt = pendingReducerCnt; + this.totalMapperCnt = totalMapperCnt; + this.totalReducerCnt = totalReducerCnt; + this.jobPhase = jobPhase; + this.failed = failed; + this.ver = ver; + } + + /** + * @return Job ID. + */ + public HadoopJobId jobId() { + return jobId; + } + + /** + * @return Job name. + */ + public String jobName() { + return jobName; + } + + /** + * @return User. + */ + public String user() { + return usr; + } + + /** + * @return Pending mappers count. + */ + public int pendingMapperCnt() { + return pendingMapperCnt; + } + + /** + * @return Pending reducers count. + */ + public int pendingReducerCnt() { + return pendingReducerCnt; + } + + /** + * @return Total mappers count. + */ + public int totalMapperCnt() { + return totalMapperCnt; + } + + /** + * @return Total reducers count. + */ + public int totalReducerCnt() { + return totalReducerCnt; + } + + /** + * @return Version. + */ + public long version() { + return ver; + } + + /** + * @return Job phase. + */ + public HadoopJobPhase jobPhase() { + return jobPhase; + } + + /** + * @return {@code true} If the job failed. + */ + public boolean isFailed() { + return failed; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopJobStatus.class, this); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(jobId); + U.writeString(out, jobName); + U.writeString(out, usr); + out.writeInt(pendingMapperCnt); + out.writeInt(pendingReducerCnt); + out.writeInt(totalMapperCnt); + out.writeInt(totalReducerCnt); + out.writeObject(jobPhase); + out.writeBoolean(failed); + out.writeLong(ver); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + jobId = (HadoopJobId)in.readObject(); + jobName = U.readString(in); + usr = U.readString(in); + pendingMapperCnt = in.readInt(); + pendingReducerCnt = in.readInt(); + totalMapperCnt = in.readInt(); + totalReducerCnt = in.readInt(); + jobPhase = (HadoopJobPhase)in.readObject(); + failed = in.readBoolean(); + ver = in.readLong(); + } +}