http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlanner.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlanner.java deleted file mode 100644 index 56c6913..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReducePlanner.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Map-reduce execution planner. - */ -public interface GridHadoopMapReducePlanner { - /** - * Prepares map-reduce execution plan for the given job and topology. - * - * @param job Job. - * @param top Topology. - * @param oldPlan Old plan in case of partial failure. - * @return Map reduce plan. - */ - public GridHadoopMapReducePlan preparePlan(GridHadoopJob job, Collection<ClusterNode> top, - @Nullable GridHadoopMapReducePlan oldPlan) throws IgniteCheckedException; -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPartitioner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPartitioner.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPartitioner.java deleted file mode 100644 index fcde424..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPartitioner.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -/** - * Partitioner. - */ -public interface GridHadoopPartitioner { - /** - * Gets partition which is actually a reducer index for the given key and value pair. - * - * @param key Key. - * @param val Value. - * @param parts Number of partitions. - * @return Partition. - */ - public int partition(Object key, Object val, int parts); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSerialization.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSerialization.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSerialization.java deleted file mode 100644 index 5bc8806..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSerialization.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * Hadoop serialization. Not thread safe object, must be created for each thread or correctly synchronized. - */ -public interface GridHadoopSerialization extends AutoCloseable { - /** - * Writes the given object to output. - * - * @param out Output. - * @param obj Object to serialize. - * @throws IgniteCheckedException If failed. - */ - public void write(DataOutput out, Object obj) throws IgniteCheckedException; - - /** - * Reads object from the given input optionally reusing given instance. - * - * @param in Input. - * @param obj Object. - * @return New object or reused instance. - * @throws IgniteCheckedException If failed. - */ - public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException; - - /** - * Finalise the internal objects. - * - * @throws IgniteCheckedException If failed. - */ - @Override public void close() throws IgniteCheckedException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTask.java deleted file mode 100644 index be34f81..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTask.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; - -import java.io.*; - -/** - * Hadoop task. - */ -public abstract class GridHadoopTask { - /** */ - private GridHadoopTaskInfo taskInfo; - - /** - * Creates task. - * - * @param taskInfo Task info. - */ - protected GridHadoopTask(GridHadoopTaskInfo taskInfo) { - assert taskInfo != null; - - this.taskInfo = taskInfo; - } - - /** - * For {@link Externalizable}. - */ - @SuppressWarnings("ConstructorNotProtectedInAbstractClass") - public GridHadoopTask() { - // No-op. - } - - /** - * Gets task info. - * - * @return Task info. - */ - public GridHadoopTaskInfo info() { - return taskInfo; - } - - /** - * Runs task. - * - * @param taskCtx Context. - * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If interrupted. - * @throws IgniteCheckedException If failed. - */ - public abstract void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException; - - /** - * Interrupts task execution. - */ - public abstract void cancel(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskContext.java deleted file mode 100644 index bedd93b..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskContext.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; - -import java.util.*; - -/** - * Task context. - */ -public abstract class GridHadoopTaskContext { - /** */ - private final GridHadoopJob job; - - /** */ - private GridHadoopTaskInput input; - - /** */ - private GridHadoopTaskOutput output; - - /** */ - private GridHadoopTaskInfo taskInfo; - - /** - * @param taskInfo Task info. - * @param job Job. - */ - protected GridHadoopTaskContext(GridHadoopTaskInfo taskInfo, GridHadoopJob job) { - this.taskInfo = taskInfo; - this.job = job; - } - - /** - * Gets task info. - * - * @return Task info. - */ - public GridHadoopTaskInfo taskInfo() { - return taskInfo; - } - - /** - * Set a new task info. - * - * @param info Task info. - */ - public void taskInfo(GridHadoopTaskInfo info) { - taskInfo = info; - } - - /** - * Gets task output. - * - * @return Task output. - */ - public GridHadoopTaskOutput output() { - return output; - } - - /** - * Gets task input. - * - * @return Task input. - */ - public GridHadoopTaskInput input() { - return input; - } - - /** - * @return Job. - */ - public GridHadoopJob job() { - return job; - } - - /** - * Gets counter for the given name. - * - * @param grp Counter group's name. - * @param name Counter name. - * @return Counter. - */ - public abstract <T extends GridHadoopCounter> T counter(String grp, String name, Class<T> cls); - - /** - * Gets all known counters. - * - * @return Unmodifiable collection of counters. - */ - public abstract GridHadoopCounters counters(); - - /** - * Sets input of the task. - * - * @param in Input. - */ - public void input(GridHadoopTaskInput in) { - input = in; - } - - /** - * Sets output of the task. - * - * @param out Output. - */ - public void output(GridHadoopTaskOutput out) { - output = out; - } - - /** - * Gets partitioner. - * - * @return Partitioner. - * @throws IgniteCheckedException If failed. - */ - public abstract GridHadoopPartitioner partitioner() throws IgniteCheckedException; - - /** - * Gets serializer for values. - * - * @return Serializer for keys. - * @throws IgniteCheckedException If failed. - */ - public abstract GridHadoopSerialization keySerialization() throws IgniteCheckedException; - - /** - * Gets serializer for values. - * - * @return Serializer for values. - * @throws IgniteCheckedException If failed. - */ - public abstract GridHadoopSerialization valueSerialization() throws IgniteCheckedException; - - /** - * Gets sorting comparator. - * - * @return Comparator for sorting. - */ - public abstract Comparator<Object> sortComparator(); - - /** - * Gets comparator for grouping on combine or reduce operation. - * - * @return Comparator. - */ - public abstract Comparator<Object> groupComparator(); - - /** - * Execute current task. - * - * @throws IgniteCheckedException If failed. - */ - public abstract void run() throws IgniteCheckedException; - - /** - * Cancel current task execution. - */ - public abstract void cancel(); - - /** - * Prepare local environment for the task. - * - * @throws IgniteCheckedException If failed. - */ - public abstract void prepareTaskEnvironment() throws IgniteCheckedException; - - /** - * Cleans up local environment of the task. - * - * @throws IgniteCheckedException If failed. - */ - public abstract void cleanupTaskEnvironment() throws IgniteCheckedException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInfo.java deleted file mode 100644 index 75e06ca..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInfo.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * Task info. - */ -public class GridHadoopTaskInfo implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private GridHadoopTaskType type; - - /** */ - private GridHadoopJobId jobId; - - /** */ - private int taskNum; - - /** */ - private int attempt; - - /** */ - private GridHadoopInputSplit inputSplit; - - /** - * For {@link Externalizable}. - */ - public GridHadoopTaskInfo() { - // No-op. - } - - /** - * Creates new task info. - * - * @param type Task type. - * @param jobId Job id. - * @param taskNum Task number. - * @param attempt Attempt for this task. - * @param inputSplit Input split. - */ - public GridHadoopTaskInfo(GridHadoopTaskType type, GridHadoopJobId jobId, int taskNum, int attempt, - @Nullable GridHadoopInputSplit inputSplit) { - this.type = type; - this.jobId = jobId; - this.taskNum = taskNum; - this.attempt = attempt; - this.inputSplit = inputSplit; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeByte(type.ordinal()); - out.writeObject(jobId); - out.writeInt(taskNum); - out.writeInt(attempt); - out.writeObject(inputSplit); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - type = GridHadoopTaskType.fromOrdinal(in.readByte()); - jobId = (GridHadoopJobId)in.readObject(); - taskNum = in.readInt(); - attempt = in.readInt(); - inputSplit = (GridHadoopInputSplit)in.readObject(); - } - - /** - * @return Type. - */ - public GridHadoopTaskType type() { - return type; - } - - /** - * @return Job id. - */ - public GridHadoopJobId jobId() { - return jobId; - } - - /** - * @return Task number. - */ - public int taskNumber() { - return taskNum; - } - - /** - * @return Attempt. - */ - public int attempt() { - return attempt; - } - - /** - * @return Input split. - */ - @Nullable public GridHadoopInputSplit inputSplit() { - return inputSplit; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (!(o instanceof GridHadoopTaskInfo)) - return false; - - GridHadoopTaskInfo that = (GridHadoopTaskInfo)o; - - return attempt == that.attempt && taskNum == that.taskNum && jobId.equals(that.jobId) && type == that.type; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = type.hashCode(); - - res = 31 * res + jobId.hashCode(); - res = 31 * res + taskNum; - res = 31 * res + attempt; - - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridHadoopTaskInfo.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInput.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInput.java deleted file mode 100644 index 479cf6d..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskInput.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; - -import java.util.*; - -/** - * Task input. - */ -public interface GridHadoopTaskInput extends AutoCloseable { - /** - * Moves cursor to the next element. - * - * @return {@code false} If input is exceeded. - */ - boolean next(); - - /** - * Gets current key. - * - * @return Key. - */ - Object key(); - - /** - * Gets values for current key. - * - * @return Values. - */ - Iterator<?> values(); - - /** - * Closes input. - * - * @throws IgniteCheckedException If failed. - */ - @Override public void close() throws IgniteCheckedException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskOutput.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskOutput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskOutput.java deleted file mode 100644 index 6480d8d..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskOutput.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; - -/** - * Task output. - */ -public interface GridHadoopTaskOutput extends AutoCloseable { - /** - * Writes key and value to the output. - * - * @param key Key. - * @param val Value. - */ - public void write(Object key, Object val) throws IgniteCheckedException; - - /** - * Closes output. - * - * @throws IgniteCheckedException If failed. - */ - @Override public void close() throws IgniteCheckedException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskType.java deleted file mode 100644 index 404d6b8..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskType.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.jetbrains.annotations.*; - -/** -* Task type. -*/ -public enum GridHadoopTaskType { - /** Setup task. */ - SETUP, - - /** Map task. */ - MAP, - - /** Reduce task. */ - REDUCE, - - /** Combine task. */ - COMBINE, - - /** Commit task. */ - COMMIT, - - /** Abort task. */ - ABORT; - - /** Enumerated values. */ - private static final GridHadoopTaskType[] VALS = values(); - - /** - * Efficiently gets enumerated value from its ordinal. - * - * @param ord Ordinal value. - * @return Enumerated value. - */ - @Nullable public static GridHadoopTaskType fromOrdinal(byte ord) { - return ord >= 0 && ord < VALS.length ? VALS[ord] : null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/Hadoop.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/Hadoop.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/Hadoop.java new file mode 100644 index 0000000..9efc4a9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/Hadoop.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.jetbrains.annotations.*; + +/** + * Hadoop facade providing access to Ignite Hadoop features. + */ +public interface Hadoop { + /** + * Gets Hadoop module configuration. + * + * @return Hadoop module configuration. + */ + public HadoopConfiguration configuration(); + + /** + * Generate next job ID. + * + * @return Next job ID. + */ + public HadoopJobId nextJobId(); + + /** + * Submits job to job tracker. + * + * @param jobId Job ID to submit. + * @param jobInfo Job info to submit. + * @return Execution future. + */ + public IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo); + + /** + * Gets Hadoop job execution status. + * + * @param jobId Job ID to get status for. + * @return Job execution status or {@code null} in case job with the given ID is not found. + * @throws IgniteCheckedException If failed. + */ + @Nullable public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException; + + /** + * Returns job counters. + * + * @param jobId Job ID to get counters for. + * @return Job counters object. + * @throws IgniteCheckedException If failed. + */ + public HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException; + + /** + * Gets Hadoop finish future for particular job. + * + * @param jobId Job ID. + * @return Job finish future or {@code null} in case job with the given ID is not found. + * @throws IgniteCheckedException If failed. + */ + @Nullable public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException; + + /** + * Kills job. + * + * @param jobId Job ID. + * @return {@code True} if job was killed. + * @throws IgniteCheckedException If failed. + */ + public boolean kill(HadoopJobId jobId) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopFileBlock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopFileBlock.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopFileBlock.java new file mode 100644 index 0000000..223e572 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopFileBlock.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * Hadoop file block. + */ +public class HadoopFileBlock extends HadoopInputSplit { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @GridToStringInclude + protected URI file; + + /** */ + @GridToStringInclude + protected long start; + + /** */ + @GridToStringInclude + protected long len; + + /** + * Creates new file block. + */ + public HadoopFileBlock() { + // No-op. + } + + /** + * Creates new file block. + * + * @param hosts List of hosts where the block resides. + * @param file File URI. + * @param start Start position of the block in the file. + * @param len Length of the block. + */ + public HadoopFileBlock(String[] hosts, URI file, long start, long len) { + A.notNull(hosts, "hosts", file, "file"); + + this.hosts = hosts; + this.file = file; + this.start = start; + this.len = len; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(file()); + out.writeLong(start()); + out.writeLong(length()); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + file = (URI)in.readObject(); + start = in.readLong(); + len = in.readLong(); + } + + /** + * @return Length. + */ + public long length() { + return len; + } + + /** + * @param len New length. + */ + public void length(long len) { + this.len = len; + } + + /** + * @return Start. + */ + public long start() { + return start; + } + + /** + * @param start New start. + */ + public void start(long start) { + this.start = start; + } + + /** + * @return File. + */ + public URI file() { + return file; + } + + /** + * @param file New file. + */ + public void file(URI file) { + this.file = file; + } + + /** + * @param hosts New hosts. + */ + public void hosts(String[] hosts) { + A.notNull(hosts, "hosts"); + + this.hosts = hosts; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (!(o instanceof HadoopFileBlock)) + return false; + + HadoopFileBlock that = (HadoopFileBlock)o; + + return len == that.len && start == that.start && file.equals(that.file); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = file.hashCode(); + + res = 31 * res + (int)(start ^ (start >>> 32)); + res = 31 * res + (int)(len ^ (len >>> 32)); + + return res; + } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(HadoopFileBlock.class, this, "hosts", Arrays.toString(hosts)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopInputSplit.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopInputSplit.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopInputSplit.java new file mode 100644 index 0000000..0c94012 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopInputSplit.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import java.io.*; + +/** + * Abstract fragment of an input data source. + */ +public abstract class HadoopInputSplit implements Externalizable { + /** */ + protected String[] hosts; + + /** + * Array of hosts where this input split resides. + * + * @return Hosts. + */ + public String[] hosts() { + assert hosts != null; + + return hosts; + } + + /** + * This method must be implemented for purpose of internal implementation. + * + * @param obj Another object. + * @return {@code true} If objects are equal. + */ + @Override public abstract boolean equals(Object obj); + + /** + * This method must be implemented for purpose of internal implementation. + * + * @return Hash code of the object. + */ + @Override public abstract int hashCode(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java new file mode 100644 index 0000000..65cb48d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; + +import java.util.*; + +/** + * Hadoop job. + */ +public interface HadoopJob { + /** + * Gets job ID. + * + * @return Job ID. + */ + public HadoopJobId id(); + + /** + * Gets job information. + * + * @return Job information. + */ + public HadoopJobInfo info(); + + /** + * Gets collection of input splits for this job. + * + * @return Input splits. + */ + public Collection<HadoopInputSplit> input() throws IgniteCheckedException; + + /** + * Returns context for task execution. + * + * @param info Task info. + * @return Task Context. + * @throws IgniteCheckedException If failed. + */ + public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException; + + /** + * Does all the needed initialization for the job. Will be called on each node where tasks for this job must + * be executed. + * <p> + * If job is running in external mode this method will be called on instance in Ignite node with parameter + * {@code false} and on instance in external process with parameter {@code true}. + * + * @param external If {@code true} then this job instance resides in external process. + * @param locNodeId Local node ID. + * @throws IgniteCheckedException If failed. + */ + public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException; + + /** + * Release all the resources. + * <p> + * If job is running in external mode this method will be called on instance in Ignite node with parameter + * {@code false} and on instance in external process with parameter {@code true}. + * + * @param external If {@code true} then this job instance resides in external process. + * @throws IgniteCheckedException If failed. + */ + public void dispose(boolean external) throws IgniteCheckedException; + + /** + * Prepare local environment for the task. + * + * @param info Task info. + * @throws IgniteCheckedException If failed. + */ + public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException; + + /** + * Cleans up local environment of the task. + * + * @param info Task info. + * @throws IgniteCheckedException If failed. + */ + public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException; + + /** + * Cleans up the job staging directory. + */ + void cleanupStagingDirectory(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobId.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobId.java new file mode 100644 index 0000000..b0593a8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobId.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * Job ID. + */ +public class HadoopJobId implements GridCacheInternal, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private UUID nodeId; + + /** */ + private int jobId; + + /** + * For {@link Externalizable}. + */ + public HadoopJobId() { + // No-op. + } + + /** + * @param nodeId Node ID. + * @param jobId Job ID. + */ + public HadoopJobId(UUID nodeId, int jobId) { + this.nodeId = nodeId; + this.jobId = jobId; + } + + public UUID globalId() { + return nodeId; + } + + public int localId() { + return jobId; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeUuid(out, nodeId); + out.writeInt(jobId); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + nodeId = U.readUuid(in); + jobId = in.readInt(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + HadoopJobId that = (HadoopJobId) o; + + if (jobId != that.jobId) + return false; + + if (!nodeId.equals(that.nodeId)) + return false; + + return true; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return 31 * nodeId.hashCode() + jobId; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return nodeId + "_" + jobId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java new file mode 100644 index 0000000..51faf5d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Compact job description. + */ +public interface HadoopJobInfo extends Serializable { + /** + * Gets optional configuration property for the job. + * + * @param name Property name. + * @return Value or {@code null} if none. + */ + @Nullable public String property(String name); + + /** + * Checks whether job has combiner. + * + * @return {@code true} If job has combiner. + */ + public boolean hasCombiner(); + + /** + * Checks whether job has reducer. + * Actual number of reducers will be in {@link HadoopMapReducePlan#reducers()}. + * + * @return Number of reducer. + */ + public boolean hasReducer(); + + /** + * Creates new job instance for the given ID. + * {@link HadoopJobInfo} is reusable for multiple jobs while {@link HadoopJob} is for one job execution. + * This method will be called once for the same ID on one node, though it can be called on the same host + * multiple times from different processes (in case of multiple nodes on the same host or external execution). + * + * @param jobId Job ID. + * @param log Logger. + * @return Job. + * @throws IgniteCheckedException If failed. + */ + HadoopJob createJob(HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException; + + /** + * @return Number of reducers configured for job. + */ + public int reducers(); + + /** + * Gets job name. + * + * @return Job name. + */ + public String jobName(); + + /** + * Gets user name. + * + * @return User name. + */ + public String user(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobPhase.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobPhase.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobPhase.java new file mode 100644 index 0000000..8c932bb --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobPhase.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +/** + * Job run phase. + */ +public enum HadoopJobPhase { + /** Job is running setup task. */ + PHASE_SETUP, + + /** Job is running map and combine tasks. */ + PHASE_MAP, + + /** Job has finished all map tasks and running reduce tasks. */ + PHASE_REDUCE, + + /** Job is stopping due to exception during any of the phases. */ + PHASE_CANCELLING, + + /** Job has finished execution. */ + PHASE_COMPLETE +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java new file mode 100644 index 0000000..1a58624 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.jetbrains.annotations.*; + +/** + * Enumeration of optional properties supported by Ignite for Apache Hadoop. + */ +public enum HadoopJobProperty { + /** + * Initial size for hashmap which stores output of mapper and will be used as input of combiner. + * <p> + * Setting it right allows to avoid rehashing. + */ + COMBINER_HASHMAP_SIZE, + + /** + * Initial size for hashmap which stores output of mapper or combiner and will be used as input of reducer. + * <p> + * Setting it right allows to avoid rehashing. + */ + PARTITION_HASHMAP_SIZE, + + /** + * Specifies number of concurrently running mappers for external execution mode. + * <p> + * If not specified, defaults to {@code Runtime.getRuntime().availableProcessors()}. + */ + EXTERNAL_CONCURRENT_MAPPERS, + + /** + * Specifies number of concurrently running reducers for external execution mode. + * <p> + * If not specified, defaults to {@code Runtime.getRuntime().availableProcessors()}. + */ + EXTERNAL_CONCURRENT_REDUCERS, + + /** + * Delay in milliseconds after which Ignite server will reply job status. + */ + JOB_STATUS_POLL_DELAY, + + /** + * Size in bytes of single memory page which will be allocated for data structures in shuffle. + * <p> + * By default is {@code 32 * 1024}. + */ + SHUFFLE_OFFHEAP_PAGE_SIZE, + + /** + * If set to {@code true} then input for combiner will not be sorted by key. + * Internally hash-map will be used instead of sorted one, so {@link Object#equals(Object)} + * and {@link Object#hashCode()} methods of key must be implemented consistently with + * comparator for that type. Grouping comparator is not supported if this setting is {@code true}. + * <p> + * By default is {@code false}. + */ + SHUFFLE_COMBINER_NO_SORTING, + + /** + * If set to {@code true} then input for reducer will not be sorted by key. + * Internally hash-map will be used instead of sorted one, so {@link Object#equals(Object)} + * and {@link Object#hashCode()} methods of key must be implemented consistently with + * comparator for that type. Grouping comparator is not supported if this setting is {@code true}. + * <p> + * By default is {@code false}. + */ + SHUFFLE_REDUCER_NO_SORTING; + + /** */ + private final String ptyName; + + /** + * + */ + HadoopJobProperty() { + ptyName = "ignite." + name().toLowerCase().replace('_', '.'); + } + + /** + * @return Property name. + */ + public String propertyName() { + return ptyName; + } + + /** + * @param jobInfo Job info. + * @param pty Property. + * @param dflt Default value. + * @return Property value. + */ + public static String get(HadoopJobInfo jobInfo, HadoopJobProperty pty, @Nullable String dflt) { + String res = jobInfo.property(pty.propertyName()); + + return res == null ? dflt : res; + } + + /** + * @param jobInfo Job info. + * @param pty Property. + * @param dflt Default value. + * @return Property value. + */ + public static int get(HadoopJobInfo jobInfo, HadoopJobProperty pty, int dflt) { + String res = jobInfo.property(pty.propertyName()); + + return res == null ? dflt : Integer.parseInt(res); + } + + /** + * @param jobInfo Job info. + * @param pty Property. + * @param dflt Default value. + * @return Property value. + */ + public static boolean get(HadoopJobInfo jobInfo, HadoopJobProperty pty, boolean dflt) { + String res = jobInfo.property(pty.propertyName()); + + return res == null ? dflt : Boolean.parseBoolean(res); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobStatus.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobStatus.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobStatus.java new file mode 100644 index 0000000..752556d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobStatus.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Hadoop job status. + */ +public class HadoopJobStatus implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Job ID. */ + private HadoopJobId jobId; + + /** Job name. */ + private String jobName; + + /** User. */ + private String usr; + + /** Pending mappers count. */ + private int pendingMapperCnt; + + /** Pending reducers count. */ + private int pendingReducerCnt; + + /** Total mappers count. */ + private int totalMapperCnt; + + /** Total reducers count. */ + private int totalReducerCnt; + /** Phase. */ + private HadoopJobPhase jobPhase; + + /** */ + private boolean failed; + + /** Version. */ + private long ver; + + /** + * {@link Externalizable} support. + */ + public HadoopJobStatus() { + // No-op. + } + + /** + * Constructor. + * + * @param jobId Job ID. + * @param jobName Job name. + * @param usr User. + * @param pendingMapperCnt Pending mappers count. + * @param pendingReducerCnt Pending reducers count. + * @param totalMapperCnt Total mappers count. + * @param totalReducerCnt Total reducers count. + * @param jobPhase Job phase. + * @param failed Failed. + * @param ver Version. + */ + public HadoopJobStatus( + HadoopJobId jobId, + String jobName, + String usr, + int pendingMapperCnt, + int pendingReducerCnt, + int totalMapperCnt, + int totalReducerCnt, + HadoopJobPhase jobPhase, + boolean failed, + long ver + ) { + this.jobId = jobId; + this.jobName = jobName; + this.usr = usr; + this.pendingMapperCnt = pendingMapperCnt; + this.pendingReducerCnt = pendingReducerCnt; + this.totalMapperCnt = totalMapperCnt; + this.totalReducerCnt = totalReducerCnt; + this.jobPhase = jobPhase; + this.failed = failed; + this.ver = ver; + } + + /** + * @return Job ID. + */ + public HadoopJobId jobId() { + return jobId; + } + + /** + * @return Job name. + */ + public String jobName() { + return jobName; + } + + /** + * @return User. + */ + public String user() { + return usr; + } + + /** + * @return Pending mappers count. + */ + public int pendingMapperCnt() { + return pendingMapperCnt; + } + + /** + * @return Pending reducers count. + */ + public int pendingReducerCnt() { + return pendingReducerCnt; + } + + /** + * @return Total mappers count. + */ + public int totalMapperCnt() { + return totalMapperCnt; + } + + /** + * @return Total reducers count. + */ + public int totalReducerCnt() { + return totalReducerCnt; + } + + /** + * @return Version. + */ + public long version() { + return ver; + } + + /** + * @return Job phase. + */ + public HadoopJobPhase jobPhase() { + return jobPhase; + } + + /** + * @return {@code true} If the job failed. + */ + public boolean isFailed() { + return failed; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopJobStatus.class, this); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(jobId); + U.writeString(out, jobName); + U.writeString(out, usr); + out.writeInt(pendingMapperCnt); + out.writeInt(pendingReducerCnt); + out.writeInt(totalMapperCnt); + out.writeInt(totalReducerCnt); + out.writeObject(jobPhase); + out.writeBoolean(failed); + out.writeLong(ver); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + jobId = (HadoopJobId)in.readObject(); + jobName = U.readString(in); + usr = U.readString(in); + pendingMapperCnt = in.readInt(); + pendingReducerCnt = in.readInt(); + totalMapperCnt = in.readInt(); + totalReducerCnt = in.readInt(); + jobPhase = (HadoopJobPhase)in.readObject(); + failed = in.readBoolean(); + ver = in.readLong(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlan.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlan.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlan.java new file mode 100644 index 0000000..3da2fb1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlan.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Map-reduce job execution plan. + */ +public interface HadoopMapReducePlan extends Serializable { + /** + * Gets collection of file blocks for which mappers should be executed. + * + * @param nodeId Node ID to check. + * @return Collection of file blocks or {@code null} if no mappers should be executed on given node. + */ + @Nullable public Collection<HadoopInputSplit> mappers(UUID nodeId); + + /** + * Gets reducer IDs that should be started on given node. + * + * @param nodeId Node ID to check. + * @return Array of reducer IDs. + */ + @Nullable public int[] reducers(UUID nodeId); + + /** + * Gets collection of all node IDs involved in map part of job execution. + * + * @return Collection of node IDs. + */ + public Collection<UUID> mapperNodeIds(); + + /** + * Gets collection of all node IDs involved in reduce part of job execution. + * + * @return Collection of node IDs. + */ + public Collection<UUID> reducerNodeIds(); + + /** + * Gets overall number of mappers for the job. + * + * @return Number of mappers. + */ + public int mappers(); + + /** + * Gets overall number of reducers for the job. + * + * @return Number of reducers. + */ + public int reducers(); + + /** + * Gets node ID for reducer. + * + * @param reducer Reducer. + * @return Node ID. + */ + public UUID nodeForReducer(int reducer); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java new file mode 100644 index 0000000..ab885fe --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Map-reduce execution planner. + */ +public interface HadoopMapReducePlanner { + /** + * Prepares map-reduce execution plan for the given job and topology. + * + * @param job Job. + * @param top Topology. + * @param oldPlan Old plan in case of partial failure. + * @return Map reduce plan. + */ + public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top, + @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopNoopProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopNoopProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopNoopProcessor.java new file mode 100644 index 0000000..eb84d00 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopNoopProcessor.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.util.future.*; + +/** + * Hadoop processor. + */ +public class HadoopNoopProcessor extends HadoopProcessorAdapter { + /** + * @param ctx Kernal context. + */ + public HadoopNoopProcessor(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public Hadoop hadoop() { + throw new IllegalStateException("Hadoop module is not found in class path."); + } + + /** {@inheritDoc} */ + @Override public HadoopConfiguration config() { + return null; + } + + /** {@inheritDoc} */ + @Override public HadoopJobId nextJobId() { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo) { + return new GridFinishedFutureEx<>(new IgniteCheckedException("Hadoop is not available.")); + } + + /** {@inheritDoc} */ + @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException { + return null; + } + + /** {@inheritDoc} */ + @Override public HadoopCounters counters(HadoopJobId jobId) { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPartitioner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPartitioner.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPartitioner.java new file mode 100644 index 0000000..ec94f81 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopPartitioner.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +/** + * Partitioner. + */ +public interface HadoopPartitioner { + /** + * Gets partition which is actually a reducer index for the given key and value pair. + * + * @param key Key. + * @param val Value. + * @param parts Number of partitions. + * @return Partition. + */ + public int partition(Object key, Object val, int parts); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessorAdapter.java new file mode 100644 index 0000000..44ff8be --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessorAdapter.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; + +/** + * Hadoop processor. + */ +public abstract class HadoopProcessorAdapter extends GridProcessorAdapter { + /** + * @param ctx Kernal context. + */ + protected HadoopProcessorAdapter(GridKernalContext ctx) { + super(ctx); + } + + /** + * @return Hadoop facade. + */ + public abstract Hadoop hadoop(); + + /** + * @return Hadoop configuration. + */ + public abstract HadoopConfiguration config(); + + /** + * @return Collection of generated IDs. + */ + public abstract HadoopJobId nextJobId(); + + /** + * Submits job to job tracker. + * + * @param jobId Job ID to submit. + * @param jobInfo Job info to submit. + * @return Execution future. + */ + public abstract IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo); + + /** + * Gets Hadoop job execution status. + * + * @param jobId Job ID to get status for. + * @return Job execution status. + * @throws IgniteCheckedException If failed. + */ + public abstract HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException; + + /** + * Returns Hadoop job counters. + * + * @param jobId Job ID to get counters for. + * @return Job counters. + * @throws IgniteCheckedException If failed. + */ + public abstract HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException; + + /** + * Gets Hadoop job finish future. + * + * @param jobId Job ID. + * @return Job finish future or {@code null}. + * @throws IgniteCheckedException If failed. + */ + public abstract IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException; + + /** + * Kills job. + * + * @param jobId Job ID. + * @return {@code True} if job was killed. + * @throws IgniteCheckedException If failed. + */ + public abstract boolean kill(HadoopJobId jobId) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSerialization.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSerialization.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSerialization.java new file mode 100644 index 0000000..aab803b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSerialization.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Hadoop serialization. Not thread safe object, must be created for each thread or correctly synchronized. + */ +public interface HadoopSerialization extends AutoCloseable { + /** + * Writes the given object to output. + * + * @param out Output. + * @param obj Object to serialize. + * @throws IgniteCheckedException If failed. + */ + public void write(DataOutput out, Object obj) throws IgniteCheckedException; + + /** + * Reads object from the given input optionally reusing given instance. + * + * @param in Input. + * @param obj Object. + * @return New object or reused instance. + * @throws IgniteCheckedException If failed. + */ + public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException; + + /** + * Finalise the internal objects. + * + * @throws IgniteCheckedException If failed. + */ + @Override public void close() throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTask.java new file mode 100644 index 0000000..3ce83ae --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTask.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; + +import java.io.*; + +/** + * Hadoop task. + */ +public abstract class HadoopTask { + /** */ + private HadoopTaskInfo taskInfo; + + /** + * Creates task. + * + * @param taskInfo Task info. + */ + protected HadoopTask(HadoopTaskInfo taskInfo) { + assert taskInfo != null; + + this.taskInfo = taskInfo; + } + + /** + * For {@link Externalizable}. + */ + @SuppressWarnings("ConstructorNotProtectedInAbstractClass") + public HadoopTask() { + // No-op. + } + + /** + * Gets task info. + * + * @return Task info. + */ + public HadoopTaskInfo info() { + return taskInfo; + } + + /** + * Runs task. + * + * @param taskCtx Context. + * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If interrupted. + * @throws IgniteCheckedException If failed. + */ + public abstract void run(HadoopTaskContext taskCtx) throws IgniteCheckedException; + + /** + * Interrupts task execution. + */ + public abstract void cancel(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java new file mode 100644 index 0000000..371fd81 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; + +import java.util.*; + +/** + * Task context. + */ +public abstract class HadoopTaskContext { + /** */ + private final HadoopJob job; + + /** */ + private HadoopTaskInput input; + + /** */ + private HadoopTaskOutput output; + + /** */ + private HadoopTaskInfo taskInfo; + + /** + * @param taskInfo Task info. + * @param job Job. + */ + protected HadoopTaskContext(HadoopTaskInfo taskInfo, HadoopJob job) { + this.taskInfo = taskInfo; + this.job = job; + } + + /** + * Gets task info. + * + * @return Task info. + */ + public HadoopTaskInfo taskInfo() { + return taskInfo; + } + + /** + * Set a new task info. + * + * @param info Task info. + */ + public void taskInfo(HadoopTaskInfo info) { + taskInfo = info; + } + + /** + * Gets task output. + * + * @return Task output. + */ + public HadoopTaskOutput output() { + return output; + } + + /** + * Gets task input. + * + * @return Task input. + */ + public HadoopTaskInput input() { + return input; + } + + /** + * @return Job. + */ + public HadoopJob job() { + return job; + } + + /** + * Gets counter for the given name. + * + * @param grp Counter group's name. + * @param name Counter name. + * @return Counter. + */ + public abstract <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls); + + /** + * Gets all known counters. + * + * @return Unmodifiable collection of counters. + */ + public abstract HadoopCounters counters(); + + /** + * Sets input of the task. + * + * @param in Input. + */ + public void input(HadoopTaskInput in) { + input = in; + } + + /** + * Sets output of the task. + * + * @param out Output. + */ + public void output(HadoopTaskOutput out) { + output = out; + } + + /** + * Gets partitioner. + * + * @return Partitioner. + * @throws IgniteCheckedException If failed. + */ + public abstract HadoopPartitioner partitioner() throws IgniteCheckedException; + + /** + * Gets serializer for values. + * + * @return Serializer for keys. + * @throws IgniteCheckedException If failed. + */ + public abstract HadoopSerialization keySerialization() throws IgniteCheckedException; + + /** + * Gets serializer for values. + * + * @return Serializer for values. + * @throws IgniteCheckedException If failed. + */ + public abstract HadoopSerialization valueSerialization() throws IgniteCheckedException; + + /** + * Gets sorting comparator. + * + * @return Comparator for sorting. + */ + public abstract Comparator<Object> sortComparator(); + + /** + * Gets comparator for grouping on combine or reduce operation. + * + * @return Comparator. + */ + public abstract Comparator<Object> groupComparator(); + + /** + * Execute current task. + * + * @throws IgniteCheckedException If failed. + */ + public abstract void run() throws IgniteCheckedException; + + /** + * Cancel current task execution. + */ + public abstract void cancel(); + + /** + * Prepare local environment for the task. + * + * @throws IgniteCheckedException If failed. + */ + public abstract void prepareTaskEnvironment() throws IgniteCheckedException; + + /** + * Cleans up local environment of the task. + * + * @throws IgniteCheckedException If failed. + */ + public abstract void cleanupTaskEnvironment() throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java new file mode 100644 index 0000000..eb82cb4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Task info. + */ +public class HadoopTaskInfo implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private HadoopTaskType type; + + /** */ + private HadoopJobId jobId; + + /** */ + private int taskNum; + + /** */ + private int attempt; + + /** */ + private HadoopInputSplit inputSplit; + + /** + * For {@link Externalizable}. + */ + public HadoopTaskInfo() { + // No-op. + } + + /** + * Creates new task info. + * + * @param type Task type. + * @param jobId Job id. + * @param taskNum Task number. + * @param attempt Attempt for this task. + * @param inputSplit Input split. + */ + public HadoopTaskInfo(HadoopTaskType type, HadoopJobId jobId, int taskNum, int attempt, + @Nullable HadoopInputSplit inputSplit) { + this.type = type; + this.jobId = jobId; + this.taskNum = taskNum; + this.attempt = attempt; + this.inputSplit = inputSplit; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeByte(type.ordinal()); + out.writeObject(jobId); + out.writeInt(taskNum); + out.writeInt(attempt); + out.writeObject(inputSplit); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + type = HadoopTaskType.fromOrdinal(in.readByte()); + jobId = (HadoopJobId)in.readObject(); + taskNum = in.readInt(); + attempt = in.readInt(); + inputSplit = (HadoopInputSplit)in.readObject(); + } + + /** + * @return Type. + */ + public HadoopTaskType type() { + return type; + } + + /** + * @return Job id. + */ + public HadoopJobId jobId() { + return jobId; + } + + /** + * @return Task number. + */ + public int taskNumber() { + return taskNum; + } + + /** + * @return Attempt. + */ + public int attempt() { + return attempt; + } + + /** + * @return Input split. + */ + @Nullable public HadoopInputSplit inputSplit() { + return inputSplit; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (!(o instanceof HadoopTaskInfo)) + return false; + + HadoopTaskInfo that = (HadoopTaskInfo)o; + + return attempt == that.attempt && taskNum == that.taskNum && jobId.equals(that.jobId) && type == that.type; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = type.hashCode(); + + res = 31 * res + jobId.hashCode(); + res = 31 * res + taskNum; + res = 31 * res + attempt; + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopTaskInfo.class, this); + } +}