http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlan.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlan.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlan.java new file mode 100644 index 0000000..3da2fb1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlan.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Map-reduce job execution plan. + */ +public interface HadoopMapReducePlan extends Serializable { + /** + * Gets collection of file blocks for which mappers should be executed. + * + * @param nodeId Node ID to check. + * @return Collection of file blocks or {@code null} if no mappers should be executed on given node. + */ + @Nullable public Collection<HadoopInputSplit> mappers(UUID nodeId); + + /** + * Gets reducer IDs that should be started on given node. + * + * @param nodeId Node ID to check. + * @return Array of reducer IDs. + */ + @Nullable public int[] reducers(UUID nodeId); + + /** + * Gets collection of all node IDs involved in map part of job execution. + * + * @return Collection of node IDs. + */ + public Collection<UUID> mapperNodeIds(); + + /** + * Gets collection of all node IDs involved in reduce part of job execution. + * + * @return Collection of node IDs. + */ + public Collection<UUID> reducerNodeIds(); + + /** + * Gets overall number of mappers for the job. + * + * @return Number of mappers. + */ + public int mappers(); + + /** + * Gets overall number of reducers for the job. + * + * @return Number of reducers. + */ + public int reducers(); + + /** + * Gets node ID for reducer. + * + * @param reducer Reducer. + * @return Node ID. + */ + public UUID nodeForReducer(int reducer); +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java new file mode 100644 index 0000000..ab885fe --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Map-reduce execution planner. + */ +public interface HadoopMapReducePlanner { + /** + * Prepares map-reduce execution plan for the given job and topology. + * + * @param job Job. + * @param top Topology. + * @param oldPlan Old plan in case of partial failure. + * @return Map reduce plan. + */ + public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top, + @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopNoopProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopNoopProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopNoopProcessor.java new file mode 100644 index 0000000..eb84d00 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopNoopProcessor.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.util.future.*; + +/** + * Hadoop processor. + */ +public class HadoopNoopProcessor extends HadoopProcessorAdapter { + /** + * @param ctx Kernal context. + */ + public HadoopNoopProcessor(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public Hadoop hadoop() { + throw new IllegalStateException("Hadoop module is not found in class path."); + } + + /** {@inheritDoc} */ + @Override public HadoopConfiguration config() { + return null; + } + + /** {@inheritDoc} */ + @Override public HadoopJobId nextJobId() { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo) { + return new GridFinishedFutureEx<>(new IgniteCheckedException("Hadoop is not available.")); + } + + /** {@inheritDoc} */ + @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException { + return null; + } + + /** {@inheritDoc} */ + @Override public HadoopCounters counters(HadoopJobId jobId) { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessorAdapter.java new file mode 100644 index 0000000..44ff8be --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessorAdapter.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; + +/** + * Hadoop processor. + */ +public abstract class HadoopProcessorAdapter extends GridProcessorAdapter { + /** + * @param ctx Kernal context. + */ + protected HadoopProcessorAdapter(GridKernalContext ctx) { + super(ctx); + } + + /** + * @return Hadoop facade. + */ + public abstract Hadoop hadoop(); + + /** + * @return Hadoop configuration. + */ + public abstract HadoopConfiguration config(); + + /** + * @return Collection of generated IDs. + */ + public abstract HadoopJobId nextJobId(); + + /** + * Submits job to job tracker. + * + * @param jobId Job ID to submit. + * @param jobInfo Job info to submit. + * @return Execution future. + */ + public abstract IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo); + + /** + * Gets Hadoop job execution status. + * + * @param jobId Job ID to get status for. + * @return Job execution status. + * @throws IgniteCheckedException If failed. + */ + public abstract HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException; + + /** + * Returns Hadoop job counters. + * + * @param jobId Job ID to get counters for. + * @return Job counters. + * @throws IgniteCheckedException If failed. + */ + public abstract HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException; + + /** + * Gets Hadoop job finish future. + * + * @param jobId Job ID. + * @return Job finish future or {@code null}. + * @throws IgniteCheckedException If failed. + */ + public abstract IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException; + + /** + * Kills job. + * + * @param jobId Job ID. + * @return {@code True} if job was killed. + * @throws IgniteCheckedException If failed. + */ + public abstract boolean kill(HadoopJobId jobId) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTask.java index c6a409f..3ce83ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTask.java @@ -26,14 +26,14 @@ import java.io.*; */ public abstract class HadoopTask { /** */ - private GridHadoopTaskInfo taskInfo; + private HadoopTaskInfo taskInfo; /** * Creates task. * * @param taskInfo Task info. */ - protected HadoopTask(GridHadoopTaskInfo taskInfo) { + protected HadoopTask(HadoopTaskInfo taskInfo) { assert taskInfo != null; this.taskInfo = taskInfo; @@ -52,7 +52,7 @@ public abstract class HadoopTask { * * @return Task info. */ - public GridHadoopTaskInfo info() { + public HadoopTaskInfo info() { return taskInfo; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java index 4b66a92..371fd81 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; import java.util.*; @@ -35,13 +36,13 @@ public abstract class HadoopTaskContext { private HadoopTaskOutput output; /** */ - private GridHadoopTaskInfo taskInfo; + private HadoopTaskInfo taskInfo; /** * @param taskInfo Task info. * @param job Job. */ - protected HadoopTaskContext(GridHadoopTaskInfo taskInfo, HadoopJob job) { + protected HadoopTaskContext(HadoopTaskInfo taskInfo, HadoopJob job) { this.taskInfo = taskInfo; this.job = job; } @@ -51,7 +52,7 @@ public abstract class HadoopTaskContext { * * @return Task info. */ - public GridHadoopTaskInfo taskInfo() { + public HadoopTaskInfo taskInfo() { return taskInfo; } @@ -60,7 +61,7 @@ public abstract class HadoopTaskContext { * * @param info Task info. */ - public void taskInfo(GridHadoopTaskInfo info) { + public void taskInfo(HadoopTaskInfo info) { taskInfo = info; } @@ -103,7 +104,7 @@ public abstract class HadoopTaskContext { * * @return Unmodifiable collection of counters. */ - public abstract GridHadoopCounters counters(); + public abstract HadoopCounters counters(); /** * Sets input of the task. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java new file mode 100644 index 0000000..eb82cb4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Task info. + */ +public class HadoopTaskInfo implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private HadoopTaskType type; + + /** */ + private HadoopJobId jobId; + + /** */ + private int taskNum; + + /** */ + private int attempt; + + /** */ + private HadoopInputSplit inputSplit; + + /** + * For {@link Externalizable}. + */ + public HadoopTaskInfo() { + // No-op. + } + + /** + * Creates new task info. + * + * @param type Task type. + * @param jobId Job id. + * @param taskNum Task number. + * @param attempt Attempt for this task. + * @param inputSplit Input split. + */ + public HadoopTaskInfo(HadoopTaskType type, HadoopJobId jobId, int taskNum, int attempt, + @Nullable HadoopInputSplit inputSplit) { + this.type = type; + this.jobId = jobId; + this.taskNum = taskNum; + this.attempt = attempt; + this.inputSplit = inputSplit; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeByte(type.ordinal()); + out.writeObject(jobId); + out.writeInt(taskNum); + out.writeInt(attempt); + out.writeObject(inputSplit); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + type = HadoopTaskType.fromOrdinal(in.readByte()); + jobId = (HadoopJobId)in.readObject(); + taskNum = in.readInt(); + attempt = in.readInt(); + inputSplit = (HadoopInputSplit)in.readObject(); + } + + /** + * @return Type. + */ + public HadoopTaskType type() { + return type; + } + + /** + * @return Job id. + */ + public HadoopJobId jobId() { + return jobId; + } + + /** + * @return Task number. + */ + public int taskNumber() { + return taskNum; + } + + /** + * @return Attempt. + */ + public int attempt() { + return attempt; + } + + /** + * @return Input split. + */ + @Nullable public HadoopInputSplit inputSplit() { + return inputSplit; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (!(o instanceof HadoopTaskInfo)) + return false; + + HadoopTaskInfo that = (HadoopTaskInfo)o; + + return attempt == that.attempt && taskNum == that.taskNum && jobId.equals(that.jobId) && type == that.type; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = type.hashCode(); + + res = 31 * res + jobId.hashCode(); + res = 31 * res + taskNum; + res = 31 * res + attempt; + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopTaskInfo.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java deleted file mode 100644 index caa9194..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopNoopProcessor.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.future.*; - -/** - * Hadoop processor. - */ -public class IgniteHadoopNoopProcessor extends IgniteHadoopProcessorAdapter { - /** - * @param ctx Kernal context. - */ - public IgniteHadoopNoopProcessor(GridKernalContext ctx) { - super(ctx); - } - - /** {@inheritDoc} */ - @Override public Hadoop hadoop() { - throw new IllegalStateException("Hadoop module is not found in class path."); - } - - /** {@inheritDoc} */ - @Override public GridHadoopConfiguration config() { - return null; - } - - /** {@inheritDoc} */ - @Override public GridHadoopJobId nextJobId() { - return null; - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) { - return new GridFinishedFutureEx<>(new IgniteCheckedException("Hadoop is not available.")); - } - - /** {@inheritDoc} */ - @Override public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException { - return null; - } - - /** {@inheritDoc} */ - @Override public GridHadoopCounters counters(GridHadoopJobId jobId) { - return null; - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException { - return null; - } - - /** {@inheritDoc} */ - @Override public boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException { - return false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessorAdapter.java deleted file mode 100644 index d40d5e4..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessorAdapter.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.*; - -/** - * Hadoop processor. - */ -public abstract class IgniteHadoopProcessorAdapter extends GridProcessorAdapter { - /** - * @param ctx Kernal context. - */ - protected IgniteHadoopProcessorAdapter(GridKernalContext ctx) { - super(ctx); - } - - /** - * @return Hadoop facade. - */ - public abstract Hadoop hadoop(); - - /** - * @return Hadoop configuration. - */ - public abstract GridHadoopConfiguration config(); - - /** - * @return Collection of generated IDs. - */ - public abstract GridHadoopJobId nextJobId(); - - /** - * Submits job to job tracker. - * - * @param jobId Job ID to submit. - * @param jobInfo Job info to submit. - * @return Execution future. - */ - public abstract IgniteInternalFuture<?> submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo); - - /** - * Gets Hadoop job execution status. - * - * @param jobId Job ID to get status for. - * @return Job execution status. - * @throws IgniteCheckedException If failed. - */ - public abstract GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException; - - /** - * Returns Hadoop job counters. - * - * @param jobId Job ID to get counters for. - * @return Job counters. - * @throws IgniteCheckedException If failed. - */ - public abstract GridHadoopCounters counters(GridHadoopJobId jobId) throws IgniteCheckedException; - - /** - * Gets Hadoop job finish future. - * - * @param jobId Job ID. - * @return Job finish future or {@code null}. - * @throws IgniteCheckedException If failed. - */ - public abstract IgniteInternalFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException; - - /** - * Kills job. - * - * @param jobId Job ID. - * @return {@code True} if job was killed. - * @throws IgniteCheckedException If failed. - */ - public abstract boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounter.java new file mode 100644 index 0000000..918c3bc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.counter; + +/** + * Hadoop counter. + */ +public interface HadoopCounter { + /** + * Gets name. + * + * @return Name of the counter. + */ + public String name(); + + /** + * Gets counter group. + * + * @return Counter group's name. + */ + public String group(); + + /** + * Merge the given counter to this counter. + * + * @param cntr Counter to merge into this counter. + */ + public void merge(HadoopCounter cntr); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java new file mode 100644 index 0000000..ce67c57 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.counter; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; + +/** + * The object that writes some system counters to some storage for each running job. This operation is a part of + * whole statistics collection process. + */ +public interface HadoopCounterWriter { + /** + * Writes counters of given job to some statistics storage. + * + * @param jobInfo Job info. + * @param jobId Job id. + * @param cntrs Counters. + * @throws IgniteCheckedException If failed. + */ + public void write(HadoopJobInfo jobInfo, HadoopJobId jobId, HadoopCounters cntrs) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounters.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounters.java new file mode 100644 index 0000000..706ba77 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounters.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.counter; + +import java.util.*; + +/** + * Counters store. + */ +public interface HadoopCounters { + /** + * Returns counter for the specified group and counter name. Creates new if it does not exist. + * + * @param grp Counter group name. + * @param name Counter name. + * @param cls Class for new instance creation if it's needed. + * @return The counter that was found or added or {@code null} if create is false. + */ + <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls); + + /** + * Returns all existing counters. + * + * @return Collection of counters. + */ + Collection<HadoopCounter> all(); + + /** + * Merges all counters from another store with existing counters. + * + * @param other Counters to merge with. + */ + void merge(HadoopCounters other); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java index 01e554c..66e9761 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.*; import org.apache.ignite.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import org.apache.ignite.internal.util.typedef.*; import java.io.*; @@ -48,7 +49,7 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + USER_MACRO; /** {@inheritDoc} */ - @Override public void write(GridHadoopJobInfo jobInfo, GridHadoopJobId jobId, GridHadoopCounters cntrs) + @Override public void write(HadoopJobInfo jobInfo, HadoopJobId jobId, HadoopCounters cntrs) throws IgniteCheckedException { Configuration hadoopCfg = new Configuration(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java index 2f484d8..1856e41 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java @@ -73,7 +73,7 @@ public class HadoopClassLoader extends URLClassLoader { * @param cls Class name. * @return {@code true} if we need to check this class. */ - private static boolean isIgfsHadoop(String cls) { + private static boolean isHadoopIgfs(String cls) { String ignitePackagePrefix = "org.apache.ignite"; int len = ignitePackagePrefix.length(); @@ -100,7 +100,7 @@ public class HadoopClassLoader extends URLClassLoader { return loadClassExplicitly(name, resolve); } - if (isIgfsHadoop(name)) { // For Ignite Hadoop and IGFS classes we have to check if they depend on Hadoop. + if (isHadoopIgfs(name)) { // For Ignite Hadoop and IGFS classes we have to check if they depend on Hadoop. Boolean hasDeps = cache.get(name); if (hasDeps == null) { @@ -224,7 +224,7 @@ public class HadoopClassLoader extends URLClassLoader { if (in == null) // The class is external itself, it must be loaded from this class loader. return true; - if (!isIgfsHadoop(clsName)) // Other classes should not have external dependencies. + if (!isHadoopIgfs(clsName)) // Other classes should not have external dependencies. return false; final ClassReader rdr; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java index d897b6c..68f0baf 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.hadoop.jobtracker.*; import org.apache.ignite.internal.processors.hadoop.shuffle.*; @@ -34,7 +35,7 @@ public class HadoopContext { private GridKernalContext ctx; /** Hadoop configuration. */ - private GridHadoopConfiguration cfg; + private HadoopConfiguration cfg; /** Job tracker. */ private HadoopJobTracker jobTracker; @@ -53,7 +54,7 @@ public class HadoopContext { */ public HadoopContext( GridKernalContext ctx, - GridHadoopConfiguration cfg, + HadoopConfiguration cfg, HadoopJobTracker jobTracker, HadoopTaskExecutorAdapter taskExecutor, HadoopShuffle shuffle @@ -89,7 +90,7 @@ public class HadoopContext { * * @return Hadoop configuration. */ - public GridHadoopConfiguration configuration() { + public HadoopConfiguration configuration() { return cfg; } @@ -149,7 +150,7 @@ public class HadoopContext { if (locNodeId.equals(meta.submitNodeId())) return true; - GridHadoopMapReducePlan plan = meta.mapReducePlan(); + HadoopMapReducePlan plan = meta.mapReducePlan(); return plan.mapperNodeIds().contains(locNodeId) || plan.reducerNodeIds().contains(locNodeId) || jobUpdateLeader(); } @@ -178,7 +179,7 @@ public class HadoopContext { /** * @return Map-reduce planner. */ - public GridHadoopMapReducePlanner planner() { + public HadoopMapReducePlanner planner() { return cfg.getMapReducePlanner(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterGroup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterGroup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterGroup.java deleted file mode 100644 index 8655e14..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterGroup.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.counters.*; - -import java.io.*; -import java.util.*; - -/** - * Hadoop +counter group adapter. - */ -class HadoopCounterGroup implements CounterGroup { - /** Counters. */ - private final HadoopCounters cntrs; - - /** Group name. */ - private final String name; - - /** - * Creates new instance. - * - * @param cntrs Client counters instance. - * @param name Group name. - */ - HadoopCounterGroup(HadoopCounters cntrs, String name) { - this.cntrs = cntrs; - this.name = name; - } - - /** {@inheritDoc} */ - @Override public String getName() { - return name; - } - - /** {@inheritDoc} */ - @Override public String getDisplayName() { - return name; - } - - /** {@inheritDoc} */ - @Override public void setDisplayName(String displayName) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void addCounter(Counter counter) { - addCounter(counter.getName(), counter.getDisplayName(), 0); - } - - /** {@inheritDoc} */ - @Override public Counter addCounter(String name, String displayName, long value) { - final Counter counter = cntrs.findCounter(this.name, name); - - counter.setValue(value); - - return counter; - } - - /** {@inheritDoc} */ - @Override public Counter findCounter(String counterName, String displayName) { - return cntrs.findCounter(name, counterName); - } - - /** {@inheritDoc} */ - @Override public Counter findCounter(String counterName, boolean create) { - return cntrs.findCounter(name, counterName, create); - } - - /** {@inheritDoc} */ - @Override public Counter findCounter(String counterName) { - return cntrs.findCounter(name, counterName); - } - - /** {@inheritDoc} */ - @Override public int size() { - return cntrs.groupSize(name); - } - - /** {@inheritDoc} */ - @Override public void incrAllCounters(CounterGroupBase<Counter> rightGroup) { - for (final Counter counter : rightGroup) - cntrs.findCounter(name, counter.getName()).increment(counter.getValue()); - } - - /** {@inheritDoc} */ - @Override public CounterGroupBase<Counter> getUnderlyingGroup() { - return this; - } - - /** {@inheritDoc} */ - @Override public Iterator<Counter> iterator() { - return cntrs.iterateGroup(name); - } - - /** {@inheritDoc} */ - @Override public void write(DataOutput out) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } - - /** {@inheritDoc} */ - @Override public void readFields(DataInput in) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java deleted file mode 100644 index 39b9ba6..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.counters.*; -import org.apache.ignite.internal.processors.hadoop.counter.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; -import org.apache.ignite.internal.util.typedef.*; - -import java.io.*; -import java.util.*; - -/** - * Hadoop counters adapter. - */ -public class HadoopCounters extends Counters { - /** */ - private final Map<T2<String,String>,HadoopLongCounter> cntrs = new HashMap<>(); - - /** - * Creates new instance based on given counters. - * - * @param cntrs Counters to adapt. - */ - public HadoopCounters(GridHadoopCounters cntrs) { - for (HadoopCounter cntr : cntrs.all()) - if (cntr instanceof HadoopLongCounter) - this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (HadoopLongCounter) cntr); - } - - /** {@inheritDoc} */ - @Override public synchronized CounterGroup addGroup(CounterGroup grp) { - return addGroup(grp.getName(), grp.getDisplayName()); - } - - /** {@inheritDoc} */ - @Override public CounterGroup addGroup(String name, String displayName) { - return new HadoopCounterGroup(this, name); - } - - /** {@inheritDoc} */ - @Override public Counter findCounter(String grpName, String cntrName) { - return findCounter(grpName, cntrName, true); - } - - /** {@inheritDoc} */ - @Override public synchronized Counter findCounter(Enum<?> key) { - return findCounter(key.getDeclaringClass().getName(), key.name(), true); - } - - /** {@inheritDoc} */ - @Override public synchronized Counter findCounter(String scheme, FileSystemCounter key) { - return findCounter(String.format("FileSystem Counter (%s)", scheme), key.name()); - } - - /** {@inheritDoc} */ - @Override public synchronized Iterable<String> getGroupNames() { - Collection<String> res = new HashSet<>(); - - for (HadoopCounter counter : cntrs.values()) - res.add(counter.group()); - - return res; - } - - /** {@inheritDoc} */ - @Override public Iterator<CounterGroup> iterator() { - final Iterator<String> iter = getGroupNames().iterator(); - - return new Iterator<CounterGroup>() { - @Override public boolean hasNext() { - return iter.hasNext(); - } - - @Override public CounterGroup next() { - if (!hasNext()) - throw new NoSuchElementException(); - - return new HadoopCounterGroup(HadoopCounters.this, iter.next()); - } - - @Override public void remove() { - throw new UnsupportedOperationException("not implemented"); - } - }; - } - - /** {@inheritDoc} */ - @Override public synchronized CounterGroup getGroup(String grpName) { - return new HadoopCounterGroup(this, grpName); - } - - /** {@inheritDoc} */ - @Override public synchronized int countCounters() { - return cntrs.size(); - } - - /** {@inheritDoc} */ - @Override public synchronized void write(DataOutput out) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } - - /** {@inheritDoc} */ - @Override public synchronized void readFields(DataInput in) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } - - /** {@inheritDoc} */ - @Override public synchronized void incrAllCounters(AbstractCounters<Counter, CounterGroup> other) { - for (CounterGroup group : other) { - for (Counter counter : group) { - findCounter(group.getName(), counter.getName()).increment(counter.getValue()); - } - } - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object genericRight) { - if (!(genericRight instanceof HadoopCounters)) - return false; - - return cntrs.equals(((HadoopCounters) genericRight).cntrs); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return cntrs.hashCode(); - } - - /** {@inheritDoc} */ - @Override public void setWriteAllCounters(boolean snd) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean getWriteAllCounters() { - return true; - } - - /** {@inheritDoc} */ - @Override public Limits limits() { - return null; - } - - /** - * Returns size of a group. - * - * @param grpName Name of the group. - * @return amount of counters in the given group. - */ - public int groupSize(String grpName) { - int res = 0; - - for (HadoopCounter counter : cntrs.values()) { - if (grpName.equals(counter.group())) - res++; - } - - return res; - } - - /** - * Returns counters iterator for specified group. - * - * @param grpName Name of the group to iterate. - * @return Counters iterator. - */ - public Iterator<Counter> iterateGroup(String grpName) { - Collection<Counter> grpCounters = new ArrayList<>(); - - for (HadoopLongCounter counter : cntrs.values()) { - if (grpName.equals(counter.group())) - grpCounters.add(new HadoopV2Counter(counter)); - } - - return grpCounters.iterator(); - } - - /** - * Find a counter in the group. - * - * @param grpName The name of the counter group. - * @param cntrName The name of the counter. - * @param create Create the counter if not found if true. - * @return The counter that was found or added or {@code null} if create is false. - */ - public Counter findCounter(String grpName, String cntrName, boolean create) { - T2<String, String> key = new T2<>(grpName, cntrName); - - HadoopLongCounter internalCntr = cntrs.get(key); - - if (internalCntr == null & create) { - internalCntr = new HadoopLongCounter(grpName,cntrName); - - cntrs.put(key, new HadoopLongCounter(grpName,cntrName)); - } - - return internalCntr == null ? null : new HadoopV2Counter(internalCntr); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java index 438874a..77eb6d2 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java @@ -29,7 +29,7 @@ import java.util.*; /** * Hadoop job info based on default Hadoop configuration. */ -public class HadoopDefaultJobInfo implements GridHadoopJobInfo, Externalizable { +public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { /** */ private static final long serialVersionUID = 5489900236464999951L; @@ -82,7 +82,7 @@ public class HadoopDefaultJobInfo implements GridHadoopJobInfo, Externalizable { } /** {@inheritDoc} */ - @Override public HadoopJob createJob(GridHadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { + @Override public HadoopJob createJob(HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { try { Class<?> jobCls0 = jobCls; @@ -96,7 +96,7 @@ public class HadoopDefaultJobInfo implements GridHadoopJobInfo, Externalizable { } } - Constructor<?> constructor = jobCls0.getConstructor(GridHadoopJobId.class, HadoopDefaultJobInfo.class, + Constructor<?> constructor = jobCls0.getConstructor(HadoopJobId.class, HadoopDefaultJobInfo.class, IgniteLogger.class); return (HadoopJob)constructor.newInstance(jobId, this, log); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java index b4f2c87..27542a1 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java @@ -18,7 +18,9 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.ignite.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import org.apache.ignite.internal.util.*; import org.jetbrains.annotations.*; @@ -42,12 +44,12 @@ public class HadoopImpl implements Hadoop { } /** {@inheritDoc} */ - @Override public GridHadoopConfiguration configuration() { + @Override public HadoopConfiguration configuration() { return proc.config(); } /** {@inheritDoc} */ - @Override public GridHadoopJobId nextJobId() { + @Override public HadoopJobId nextJobId() { if (busyLock.enterBusy()) { try { return proc.nextJobId(); @@ -61,7 +63,7 @@ public class HadoopImpl implements Hadoop { } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) { + @Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo) { if (busyLock.enterBusy()) { try { return proc.submit(jobId, jobInfo); @@ -75,7 +77,7 @@ public class HadoopImpl implements Hadoop { } /** {@inheritDoc} */ - @Nullable @Override public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException { + @Nullable @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException { if (busyLock.enterBusy()) { try { return proc.status(jobId); @@ -89,7 +91,7 @@ public class HadoopImpl implements Hadoop { } /** {@inheritDoc} */ - @Nullable @Override public GridHadoopCounters counters(GridHadoopJobId jobId) throws IgniteCheckedException { + @Nullable @Override public HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException { if (busyLock.enterBusy()) { try { return proc.counters(jobId); @@ -103,7 +105,7 @@ public class HadoopImpl implements Hadoop { } /** {@inheritDoc} */ - @Nullable @Override public IgniteInternalFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException { + @Nullable @Override public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException { if (busyLock.enterBusy()) { try { return proc.finishFuture(jobId); @@ -117,7 +119,7 @@ public class HadoopImpl implements Hadoop { } /** {@inheritDoc} */ - @Override public boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException { + @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException { if (busyLock.enterBusy()) { try { return proc.kill(jobId); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java new file mode 100644 index 0000000..b0c2d3e --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounterGroup.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.counters.*; + +import java.io.*; +import java.util.*; + +/** + * Hadoop +counter group adapter. + */ +class HadoopMapReduceCounterGroup implements CounterGroup { + /** Counters. */ + private final HadoopMapReduceCounters cntrs; + + /** Group name. */ + private final String name; + + /** + * Creates new instance. + * + * @param cntrs Client counters instance. + * @param name Group name. + */ + HadoopMapReduceCounterGroup(HadoopMapReduceCounters cntrs, String name) { + this.cntrs = cntrs; + this.name = name; + } + + /** {@inheritDoc} */ + @Override public String getName() { + return name; + } + + /** {@inheritDoc} */ + @Override public String getDisplayName() { + return name; + } + + /** {@inheritDoc} */ + @Override public void setDisplayName(String displayName) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void addCounter(Counter counter) { + addCounter(counter.getName(), counter.getDisplayName(), 0); + } + + /** {@inheritDoc} */ + @Override public Counter addCounter(String name, String displayName, long value) { + final Counter counter = cntrs.findCounter(this.name, name); + + counter.setValue(value); + + return counter; + } + + /** {@inheritDoc} */ + @Override public Counter findCounter(String counterName, String displayName) { + return cntrs.findCounter(name, counterName); + } + + /** {@inheritDoc} */ + @Override public Counter findCounter(String counterName, boolean create) { + return cntrs.findCounter(name, counterName, create); + } + + /** {@inheritDoc} */ + @Override public Counter findCounter(String counterName) { + return cntrs.findCounter(name, counterName); + } + + /** {@inheritDoc} */ + @Override public int size() { + return cntrs.groupSize(name); + } + + /** {@inheritDoc} */ + @Override public void incrAllCounters(CounterGroupBase<Counter> rightGroup) { + for (final Counter counter : rightGroup) + cntrs.findCounter(name, counter.getName()).increment(counter.getValue()); + } + + /** {@inheritDoc} */ + @Override public CounterGroupBase<Counter> getUnderlyingGroup() { + return this; + } + + /** {@inheritDoc} */ + @Override public Iterator<Counter> iterator() { + return cntrs.iterateGroup(name); + } + + /** {@inheritDoc} */ + @Override public void write(DataOutput out) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + /** {@inheritDoc} */ + @Override public void readFields(DataInput in) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java new file mode 100644 index 0000000..c2c9e2a --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceCounters.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.counters.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.io.*; +import java.util.*; + +/** + * Hadoop counters adapter. + */ +public class HadoopMapReduceCounters extends Counters { + /** */ + private final Map<T2<String,String>,HadoopLongCounter> cntrs = new HashMap<>(); + + /** + * Creates new instance based on given counters. + * + * @param cntrs Counters to adapt. + */ + public HadoopMapReduceCounters(org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters cntrs) { + for (HadoopCounter cntr : cntrs.all()) + if (cntr instanceof HadoopLongCounter) + this.cntrs.put(new T2<>(cntr.group(), cntr.name()), (HadoopLongCounter) cntr); + } + + /** {@inheritDoc} */ + @Override public synchronized CounterGroup addGroup(CounterGroup grp) { + return addGroup(grp.getName(), grp.getDisplayName()); + } + + /** {@inheritDoc} */ + @Override public CounterGroup addGroup(String name, String displayName) { + return new HadoopMapReduceCounterGroup(this, name); + } + + /** {@inheritDoc} */ + @Override public Counter findCounter(String grpName, String cntrName) { + return findCounter(grpName, cntrName, true); + } + + /** {@inheritDoc} */ + @Override public synchronized Counter findCounter(Enum<?> key) { + return findCounter(key.getDeclaringClass().getName(), key.name(), true); + } + + /** {@inheritDoc} */ + @Override public synchronized Counter findCounter(String scheme, FileSystemCounter key) { + return findCounter(String.format("FileSystem Counter (%s)", scheme), key.name()); + } + + /** {@inheritDoc} */ + @Override public synchronized Iterable<String> getGroupNames() { + Collection<String> res = new HashSet<>(); + + for (HadoopCounter counter : cntrs.values()) + res.add(counter.group()); + + return res; + } + + /** {@inheritDoc} */ + @Override public Iterator<CounterGroup> iterator() { + final Iterator<String> iter = getGroupNames().iterator(); + + return new Iterator<CounterGroup>() { + @Override public boolean hasNext() { + return iter.hasNext(); + } + + @Override public CounterGroup next() { + if (!hasNext()) + throw new NoSuchElementException(); + + return new HadoopMapReduceCounterGroup(HadoopMapReduceCounters.this, iter.next()); + } + + @Override public void remove() { + throw new UnsupportedOperationException("not implemented"); + } + }; + } + + /** {@inheritDoc} */ + @Override public synchronized CounterGroup getGroup(String grpName) { + return new HadoopMapReduceCounterGroup(this, grpName); + } + + /** {@inheritDoc} */ + @Override public synchronized int countCounters() { + return cntrs.size(); + } + + /** {@inheritDoc} */ + @Override public synchronized void write(DataOutput out) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + /** {@inheritDoc} */ + @Override public synchronized void readFields(DataInput in) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + /** {@inheritDoc} */ + @Override public synchronized void incrAllCounters(AbstractCounters<Counter, CounterGroup> other) { + for (CounterGroup group : other) { + for (Counter counter : group) { + findCounter(group.getName(), counter.getName()).increment(counter.getValue()); + } + } + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object genericRight) { + if (!(genericRight instanceof HadoopMapReduceCounters)) + return false; + + return cntrs.equals(((HadoopMapReduceCounters) genericRight).cntrs); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return cntrs.hashCode(); + } + + /** {@inheritDoc} */ + @Override public void setWriteAllCounters(boolean snd) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean getWriteAllCounters() { + return true; + } + + /** {@inheritDoc} */ + @Override public Limits limits() { + return null; + } + + /** + * Returns size of a group. + * + * @param grpName Name of the group. + * @return amount of counters in the given group. + */ + public int groupSize(String grpName) { + int res = 0; + + for (HadoopCounter counter : cntrs.values()) { + if (grpName.equals(counter.group())) + res++; + } + + return res; + } + + /** + * Returns counters iterator for specified group. + * + * @param grpName Name of the group to iterate. + * @return Counters iterator. + */ + public Iterator<Counter> iterateGroup(String grpName) { + Collection<Counter> grpCounters = new ArrayList<>(); + + for (HadoopLongCounter counter : cntrs.values()) { + if (grpName.equals(counter.group())) + grpCounters.add(new HadoopV2Counter(counter)); + } + + return grpCounters.iterator(); + } + + /** + * Find a counter in the group. + * + * @param grpName The name of the counter group. + * @param cntrName The name of the counter. + * @param create Create the counter if not found if true. + * @return The counter that was found or added or {@code null} if create is false. + */ + public Counter findCounter(String grpName, String cntrName, boolean create) { + T2<String, String> key = new T2<>(grpName, cntrName); + + HadoopLongCounter internalCntr = cntrs.get(key); + + if (internalCntr == null & create) { + internalCntr = new HadoopLongCounter(grpName,cntrName); + + cntrs.put(key, new HadoopLongCounter(grpName,cntrName)); + } + + return internalCntr == null ? null : new HadoopV2Counter(internalCntr); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java index 75e55fd..f17ce66 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java @@ -18,7 +18,9 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.ignite.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import org.apache.ignite.internal.processors.hadoop.jobtracker.*; import org.apache.ignite.internal.processors.hadoop.planner.*; import org.apache.ignite.internal.processors.hadoop.shuffle.*; @@ -35,7 +37,7 @@ import static org.apache.ignite.internal.processors.hadoop.HadoopClassLoader.*; /** * Hadoop processor. */ -public class HadoopProcessor extends IgniteHadoopProcessorAdapter { +public class HadoopProcessor extends HadoopProcessorAdapter { /** Job ID counter. */ private final AtomicInteger idCtr = new AtomicInteger(); @@ -59,12 +61,12 @@ public class HadoopProcessor extends IgniteHadoopProcessorAdapter { if (ctx.isDaemon()) return; - GridHadoopConfiguration cfg = ctx.config().getHadoopConfiguration(); + HadoopConfiguration cfg = ctx.config().getHadoopConfiguration(); if (cfg == null) - cfg = new GridHadoopConfiguration(); + cfg = new HadoopConfiguration(); else - cfg = new GridHadoopConfiguration(cfg); + cfg = new HadoopConfiguration(cfg); initializeDefaults(cfg); @@ -167,37 +169,37 @@ public class HadoopProcessor extends IgniteHadoopProcessorAdapter { } /** {@inheritDoc} */ - @Override public GridHadoopConfiguration config() { + @Override public HadoopConfiguration config() { return hctx.configuration(); } /** {@inheritDoc} */ - @Override public GridHadoopJobId nextJobId() { - return new GridHadoopJobId(ctx.localNodeId(), idCtr.incrementAndGet()); + @Override public HadoopJobId nextJobId() { + return new HadoopJobId(ctx.localNodeId(), idCtr.incrementAndGet()); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) { + @Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo) { return hctx.jobTracker().submit(jobId, jobInfo); } /** {@inheritDoc} */ - @Override public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException { + @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException { return hctx.jobTracker().status(jobId); } /** {@inheritDoc} */ - @Override public GridHadoopCounters counters(GridHadoopJobId jobId) throws IgniteCheckedException { + @Override public HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException { return hctx.jobTracker().jobCounters(jobId); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException { + @Override public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException { return hctx.jobTracker().finishFuture(jobId); } /** {@inheritDoc} */ - @Override public boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException { + @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException { return hctx.jobTracker().killJob(jobId); } @@ -206,7 +208,7 @@ public class HadoopProcessor extends IgniteHadoopProcessorAdapter { * * @param cfg Hadoop configuration. */ - private void initializeDefaults(GridHadoopConfiguration cfg) { + private void initializeDefaults(HadoopConfiguration cfg) { if (cfg.getMapReducePlanner() == null) cfg.setMapReducePlanner(new HadoopDefaultMapReducePlanner()); } @@ -217,7 +219,7 @@ public class HadoopProcessor extends IgniteHadoopProcessorAdapter { * @param hadoopCfg Hadoop configuration. * @throws IgniteCheckedException If failed. */ - private void validate(GridHadoopConfiguration hadoopCfg) throws IgniteCheckedException { + private void validate(HadoopConfiguration hadoopCfg) throws IgniteCheckedException { if (ctx.config().isPeerClassLoadingEnabled()) throw new IgniteCheckedException("Peer class loading cannot be used with Hadoop (disable it using " + "GridConfiguration.setPeerClassLoadingEnabled())."); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java index 62b5a98..00be422 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java @@ -103,7 +103,7 @@ public class HadoopUtils { * @param status Ignite job status. * @return Hadoop job status. */ - public static JobStatus status(GridHadoopJobStatus status, Configuration conf) { + public static JobStatus status(HadoopJobStatus status, Configuration conf) { JobID jobId = new JobID(status.jobId().globalId().toString(), status.jobId().localId()); float setupProgress = 0; @@ -281,7 +281,7 @@ public class HadoopUtils { * @return Working directory for job. * @throws IgniteCheckedException If Failed. */ - public static File jobLocalDir(UUID locNodeId, GridHadoopJobId jobId) throws IgniteCheckedException { + public static File jobLocalDir(UUID locNodeId, HadoopJobId jobId) throws IgniteCheckedException { return new File(new File(U.resolveWorkDirectory("hadoop", false), "node-" + locNodeId), "job_" + jobId); } @@ -293,7 +293,7 @@ public class HadoopUtils { * @return Working directory for task. * @throws IgniteCheckedException If Failed. */ - public static File taskLocalDir(UUID locNodeId, GridHadoopTaskInfo info) throws IgniteCheckedException { + public static File taskLocalDir(UUID locNodeId, HadoopTaskInfo info) throws IgniteCheckedException { File jobLocDir = jobLocalDir(locNodeId, info.jobId()); return new File(jobLocDir, info.type() + "_" + info.taskNumber() + "_" + info.attempt()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java index 4b96f7d..c2ed5bb 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.hadoop.counter; -import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java index bfd59ef..78e1c26 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.hadoop.counter; import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jdk8.backport.*; @@ -31,7 +30,7 @@ import java.util.concurrent.*; /** * Default in-memory counters store. */ -public class HadoopCountersImpl implements GridHadoopCounters, Externalizable { +public class HadoopCountersImpl implements HadoopCounters, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -59,7 +58,7 @@ public class HadoopCountersImpl implements GridHadoopCounters, Externalizable { * * @param cntrs Counters to copy. */ - public HadoopCountersImpl(GridHadoopCounters cntrs) { + public HadoopCountersImpl(HadoopCounters cntrs) { this(cntrs.all()); } @@ -131,7 +130,7 @@ public class HadoopCountersImpl implements GridHadoopCounters, Externalizable { } /** {@inheritDoc} */ - @Override public void merge(GridHadoopCounters other) { + @Override public void merge(HadoopCounters other) { for (HadoopCounter counter : other.all()) counter(counter.group(), counter.name(), counter.getClass()).merge(counter); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java index d926706..ce86edb 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.processors.hadoop.counter; -import org.apache.ignite.internal.processors.hadoop.*; - import java.io.*; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java index 6f57ae4..351839a 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopPerformanceCounter.java @@ -117,7 +117,7 @@ public class HadoopPerformanceCounter extends HadoopCounterAdapter { * @param evtType The type of the event. * @return String contains necessary event information. */ - private String eventName(GridHadoopTaskInfo info, String evtType) { + private String eventName(HadoopTaskInfo info, String evtType) { return eventName(info.type().toString(), info.taskNumber(), evtType); } @@ -141,7 +141,7 @@ public class HadoopPerformanceCounter extends HadoopCounterAdapter { * @param info Task info. * @param ts Timestamp of the event. */ - public void onTaskSubmit(GridHadoopTaskInfo info, long ts) { + public void onTaskSubmit(HadoopTaskInfo info, long ts) { evts.add(new T2<>(eventName(info, "submit"), ts)); } @@ -151,7 +151,7 @@ public class HadoopPerformanceCounter extends HadoopCounterAdapter { * @param info Task info. * @param ts Timestamp of the event. */ - public void onTaskPrepare(GridHadoopTaskInfo info, long ts) { + public void onTaskPrepare(HadoopTaskInfo info, long ts) { evts.add(new T2<>(eventName(info, "prepare"), ts)); } @@ -161,7 +161,7 @@ public class HadoopPerformanceCounter extends HadoopCounterAdapter { * @param info Task info. * @param ts Timestamp of the event. */ - public void onTaskFinish(GridHadoopTaskInfo info, long ts) { + public void onTaskFinish(HadoopTaskInfo info, long ts) { if (info.type() == HadoopTaskType.REDUCE && lastShuffleMsg != null) { evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "start"), firstShuffleMsg)); evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "finish"), lastShuffleMsg)); @@ -178,7 +178,7 @@ public class HadoopPerformanceCounter extends HadoopCounterAdapter { * @param info Task info. * @param ts Timestamp of the event. */ - public void onTaskStart(GridHadoopTaskInfo info, long ts) { + public void onTaskStart(HadoopTaskInfo info, long ts) { evts.add(new T2<>(eventName(info, "start"), ts)); } @@ -209,7 +209,7 @@ public class HadoopPerformanceCounter extends HadoopCounterAdapter { * * @param info Job info. */ - public void clientSubmissionEvents(GridHadoopJobInfo info) { + public void clientSubmissionEvents(HadoopJobInfo info) { assert nodeId != null; addEventFromProperty("JOB requestId", info, REQ_NEW_JOBID_TS_PROPERTY); @@ -224,7 +224,7 @@ public class HadoopPerformanceCounter extends HadoopCounterAdapter { * @param info Job info. * @param propName Property name to get timestamp. */ - private void addEventFromProperty(String evt, GridHadoopJobInfo info, String propName) { + private void addEventFromProperty(String evt, HadoopJobInfo info, String propName) { String val = info.property(propName); if (!F.isEmpty(val)) { @@ -253,13 +253,13 @@ public class HadoopPerformanceCounter extends HadoopCounterAdapter { } /** - * Gets system predefined performance counter from the GridHadoopCounters object. + * Gets system predefined performance counter from the HadoopCounters object. * - * @param cntrs GridHadoopCounters object. + * @param cntrs HadoopCounters object. * @param nodeId Node id for methods that adds events. It may be null if you don't use ones. * @return Predefined performance counter. */ - public static HadoopPerformanceCounter getCounter(GridHadoopCounters cntrs, @Nullable UUID nodeId) { + public static HadoopPerformanceCounter getCounter(HadoopCounters cntrs, @Nullable UUID nodeId) { HadoopPerformanceCounter cntr = cntrs.counter(GROUP_NAME, COUNTER_NAME, HadoopPerformanceCounter.class); if (nodeId != null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java index 35fd27c..0c29454 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsJclLogger.java @@ -19,6 +19,8 @@ package org.apache.ignite.internal.processors.hadoop.igfs; import org.apache.commons.logging.*; import org.apache.ignite.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; /** @@ -26,6 +28,7 @@ import org.jetbrains.annotations.*; */ public class HadoopIgfsJclLogger implements IgniteLogger { /** JCL implementation proxy. */ + @GridToStringInclude private Log impl; /** @@ -107,6 +110,6 @@ public class HadoopIgfsJclLogger implements IgniteLogger { /** {@inheritDoc} */ @Override public String toString() { - return "IgfsHadoopJclLogger [impl=" + impl + ']'; + return S.toString(HadoopIgfsJclLogger.class, this); } }