http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java deleted file mode 100644 index 342cbab..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlanner.java +++ /dev/null @@ -1,434 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.planner; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.igfs.hadoop.*; -import org.apache.ignite.internal.processors.igfs.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.resources.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -import static org.apache.ignite.IgniteFs.*; - -/** - * Default map-reduce planner implementation. - */ -public class GridHadoopDefaultMapReducePlanner implements GridHadoopMapReducePlanner { - /** Injected grid. */ - @IgniteInstanceResource - private Ignite ignite; - - /** Logger. */ - @SuppressWarnings("UnusedDeclaration") - @LoggerResource - private IgniteLogger log; - - /** {@inheritDoc} */ - @Override public GridHadoopMapReducePlan preparePlan(GridHadoopJob job, Collection<ClusterNode> top, - @Nullable GridHadoopMapReducePlan oldPlan) throws IgniteCheckedException { - // Convert collection of topology nodes to collection of topology node IDs. - Collection<UUID> topIds = new HashSet<>(top.size(), 1.0f); - - for (ClusterNode topNode : top) - topIds.add(topNode.id()); - - Map<UUID, Collection<GridHadoopInputSplit>> mappers = mappers(top, topIds, job.input()); - - int rdcCnt = job.info().reducers(); - - if (rdcCnt < 0) - throw new IgniteCheckedException("Number of reducers must be non-negative, actual: " + rdcCnt); - - Map<UUID, int[]> reducers = reducers(top, mappers, rdcCnt); - - return new GridHadoopDefaultMapReducePlan(mappers, reducers); - } - - /** - * Create plan for mappers. - * - * @param top Topology nodes. - * @param topIds Topology node IDs. - * @param splits Splits. - * @return Mappers map. - * @throws IgniteCheckedException If failed. - */ - private Map<UUID, Collection<GridHadoopInputSplit>> mappers(Collection<ClusterNode> top, Collection<UUID> topIds, - Iterable<GridHadoopInputSplit> splits) throws IgniteCheckedException { - Map<UUID, Collection<GridHadoopInputSplit>> mappers = new HashMap<>(); - - Map<String, Collection<UUID>> nodes = hosts(top); - - Map<UUID, Integer> nodeLoads = new HashMap<>(top.size(), 1.0f); // Track node load. - - for (UUID nodeId : topIds) - nodeLoads.put(nodeId, 0); - - for (GridHadoopInputSplit split : splits) { - UUID nodeId = nodeForSplit(split, topIds, nodes, nodeLoads); - - if (log.isDebugEnabled()) - log.debug("Mapped split to node [split=" + split + ", nodeId=" + nodeId + ']'); - - Collection<GridHadoopInputSplit> nodeSplits = mappers.get(nodeId); - - if (nodeSplits == null) { - nodeSplits = new ArrayList<>(); - - mappers.put(nodeId, nodeSplits); - } - - nodeSplits.add(split); - - // Updated node load. - nodeLoads.put(nodeId, nodeLoads.get(nodeId) + 1); - } - - return mappers; - } - - /** - * Groups nodes by host names. - * - * @param top Topology to group. - * @return Map. - */ - private static Map<String, Collection<UUID>> hosts(Collection<ClusterNode> top) { - Map<String, Collection<UUID>> grouped = U.newHashMap(top.size()); - - for (ClusterNode node : top) { - for (String host : node.hostNames()) { - Collection<UUID> nodeIds = grouped.get(host); - - if (nodeIds == null) { - // Expecting 1-2 nodes per host. - nodeIds = new ArrayList<>(2); - - grouped.put(host, nodeIds); - } - - nodeIds.add(node.id()); - } - } - - return grouped; - } - - /** - * Determine the best node for this split. - * - * @param split Split. - * @param topIds Topology node IDs. - * @param nodes Nodes. - * @param nodeLoads Node load tracker. - * @return Node ID. - */ - @SuppressWarnings("unchecked") - private UUID nodeForSplit(GridHadoopInputSplit split, Collection<UUID> topIds, Map<String, Collection<UUID>> nodes, - Map<UUID, Integer> nodeLoads) throws IgniteCheckedException { - if (split instanceof GridHadoopFileBlock) { - GridHadoopFileBlock split0 = (GridHadoopFileBlock)split; - - if (IGFS_SCHEME.equalsIgnoreCase(split0.file().getScheme())) { - IgfsHadoopEndpoint endpoint = new IgfsHadoopEndpoint(split0.file().getAuthority()); - - IgfsEx igfs = null; - - if (F.eq(ignite.name(), endpoint.grid())) - igfs = (IgfsEx)((IgniteEx)ignite).igfsx(endpoint.igfs()); - - if (igfs != null && !igfs.isProxy(split0.file())) { - Collection<IgfsBlockLocation> blocks; - - try { - blocks = igfs.affinity(new IgfsPath(split0.file()), split0.start(), split0.length()); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - - assert blocks != null; - - if (blocks.size() == 1) - // Fast-path, split consists of one IGFS block (as in most cases). - return bestNode(blocks.iterator().next().nodeIds(), topIds, nodeLoads, false); - else { - // Slow-path, file consists of multiple IGFS blocks. First, find the most co-located nodes. - Map<UUID, Long> nodeMap = new HashMap<>(); - - List<UUID> bestNodeIds = null; - long bestLen = -1L; - - for (IgfsBlockLocation block : blocks) { - for (UUID blockNodeId : block.nodeIds()) { - if (topIds.contains(blockNodeId)) { - Long oldLen = nodeMap.get(blockNodeId); - long newLen = oldLen == null ? block.length() : oldLen + block.length(); - - nodeMap.put(blockNodeId, newLen); - - if (bestNodeIds == null || bestLen < newLen) { - bestNodeIds = new ArrayList<>(1); - - bestNodeIds.add(blockNodeId); - - bestLen = newLen; - } - else if (bestLen == newLen) { - assert !F.isEmpty(bestNodeIds); - - bestNodeIds.add(blockNodeId); - } - } - } - } - - if (bestNodeIds != null) { - return bestNodeIds.size() == 1 ? bestNodeIds.get(0) : - bestNode(bestNodeIds, topIds, nodeLoads, true); - } - } - } - } - } - - // Cannot use local IGFS for some reason, try selecting the node by host. - Collection<UUID> blockNodes = null; - - for (String host : split.hosts()) { - Collection<UUID> hostNodes = nodes.get(host); - - if (!F.isEmpty(hostNodes)) { - if (blockNodes == null) - blockNodes = new ArrayList<>(hostNodes); - else - blockNodes.addAll(hostNodes); - } - } - - return bestNode(blockNodes, topIds, nodeLoads, false); - } - - /** - * Finds the best (the least loaded) node among the candidates. - * - * @param candidates Candidates. - * @param topIds Topology node IDs. - * @param nodeLoads Known node loads. - * @param skipTopCheck Whether to skip topology check. - * @return The best node. - */ - private UUID bestNode(@Nullable Collection<UUID> candidates, Collection<UUID> topIds, Map<UUID, Integer> nodeLoads, - boolean skipTopCheck) { - UUID bestNode = null; - int bestLoad = Integer.MAX_VALUE; - - if (candidates != null) { - for (UUID candidate : candidates) { - if (skipTopCheck || topIds.contains(candidate)) { - int load = nodeLoads.get(candidate); - - if (bestNode == null || bestLoad > load) { - bestNode = candidate; - bestLoad = load; - - if (bestLoad == 0) - break; // Minimum load possible, no need for further iterations. - } - } - } - } - - if (bestNode == null) { - // Blocks are located on nodes which are not Hadoop-enabled, assign to the least loaded one. - bestLoad = Integer.MAX_VALUE; - - for (UUID nodeId : topIds) { - int load = nodeLoads.get(nodeId); - - if (bestNode == null || bestLoad > load) { - bestNode = nodeId; - bestLoad = load; - - if (bestLoad == 0) - break; // Minimum load possible, no need for further iterations. - } - } - } - - assert bestNode != null; - - return bestNode; - } - - /** - * Create plan for reducers. - * - * @param top Topology. - * @param mappers Mappers map. - * @param reducerCnt Reducers count. - * @return Reducers map. - */ - private Map<UUID, int[]> reducers(Collection<ClusterNode> top, - Map<UUID, Collection<GridHadoopInputSplit>> mappers, int reducerCnt) { - // Determine initial node weights. - int totalWeight = 0; - - List<WeightedNode> nodes = new ArrayList<>(top.size()); - - for (ClusterNode node : top) { - Collection<GridHadoopInputSplit> split = mappers.get(node.id()); - - int weight = reducerNodeWeight(node, split != null ? split.size() : 0); - - nodes.add(new WeightedNode(node.id(), weight, weight)); - - totalWeight += weight; - } - - // Adjust weights. - int totalAdjustedWeight = 0; - - for (WeightedNode node : nodes) { - node.floatWeight = ((float)node.weight * reducerCnt) / totalWeight; - - node.weight = Math.round(node.floatWeight); - - totalAdjustedWeight += node.weight; - } - - // Apply redundant/lost reducers. - Collections.sort(nodes); - - if (totalAdjustedWeight > reducerCnt) { - // Too much reducers set. - ListIterator<WeightedNode> iter = nodes.listIterator(nodes.size() - 1); - - while (totalAdjustedWeight != reducerCnt) { - if (!iter.hasPrevious()) - iter = nodes.listIterator(nodes.size() - 1); - - WeightedNode node = iter.previous(); - - if (node.weight > 0) { - node.weight -= 1; - - totalAdjustedWeight--; - } - } - } - else if (totalAdjustedWeight < reducerCnt) { - // Not enough reducers set. - ListIterator<WeightedNode> iter = nodes.listIterator(0); - - while (totalAdjustedWeight != reducerCnt) { - if (!iter.hasNext()) - iter = nodes.listIterator(0); - - WeightedNode node = iter.next(); - - if (node.floatWeight > 0.0f) { - node.weight += 1; - - totalAdjustedWeight++; - } - } - } - - int idx = 0; - - Map<UUID, int[]> reducers = new HashMap<>(nodes.size(), 1.0f); - - for (WeightedNode node : nodes) { - if (node.weight > 0) { - int[] arr = new int[node.weight]; - - for (int i = 0; i < arr.length; i++) - arr[i] = idx++; - - reducers.put(node.nodeId, arr); - } - } - - return reducers; - } - - /** - * Calculate node weight based on node metrics and data co-location. - * - * @param node Node. - * @param splitCnt Splits mapped to this node. - * @return Node weight. - */ - @SuppressWarnings("UnusedParameters") - protected int reducerNodeWeight(ClusterNode node, int splitCnt) { - return splitCnt; - } - - /** - * Weighted node. - */ - private static class WeightedNode implements Comparable<WeightedNode> { - /** Node ID. */ - private final UUID nodeId; - - /** Weight. */ - private int weight; - - /** Floating point weight. */ - private float floatWeight; - - /** - * Constructor. - * - * @param nodeId Node ID. - * @param weight Weight. - * @param floatWeight Floating point weight. - */ - private WeightedNode(UUID nodeId, int weight, float floatWeight) { - this.nodeId = nodeId; - this.weight = weight; - this.floatWeight = floatWeight; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - return obj != null && obj instanceof WeightedNode && F.eq(nodeId, ((WeightedNode)obj).nodeId); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return nodeId.hashCode(); - } - - /** {@inheritDoc} */ - @Override public int compareTo(@NotNull WeightedNode other) { - float res = other.floatWeight - floatWeight; - - return res > 0.0f ? 1 : res < 0.0f ? -1 : nodeId.compareTo(other.nodeId); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java new file mode 100644 index 0000000..9ec2b5b --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlan.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.planner; + +import org.apache.ignite.internal.processors.hadoop.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Map-reduce plan. + */ +public class HadoopDefaultMapReducePlan implements GridHadoopMapReducePlan { + /** */ + private static final long serialVersionUID = 0L; + + /** Mappers map. */ + private Map<UUID, Collection<GridHadoopInputSplit>> mappers; + + /** Reducers map. */ + private Map<UUID, int[]> reducers; + + /** Mappers count. */ + private int mappersCnt; + + /** Reducers count. */ + private int reducersCnt; + + /** + * @param mappers Mappers map. + * @param reducers Reducers map. + */ + public HadoopDefaultMapReducePlan(Map<UUID, Collection<GridHadoopInputSplit>> mappers, + Map<UUID, int[]> reducers) { + this.mappers = mappers; + this.reducers = reducers; + + if (mappers != null) { + for (Collection<GridHadoopInputSplit> splits : mappers.values()) + mappersCnt += splits.size(); + } + + if (reducers != null) { + for (int[] rdcrs : reducers.values()) + reducersCnt += rdcrs.length; + } + } + + /** {@inheritDoc} */ + @Override public int mappers() { + return mappersCnt; + } + + /** {@inheritDoc} */ + @Override public int reducers() { + return reducersCnt; + } + + /** {@inheritDoc} */ + @Override public UUID nodeForReducer(int reducer) { + assert reducer >= 0 && reducer < reducersCnt : reducer; + + for (Map.Entry<UUID, int[]> entry : reducers.entrySet()) { + for (int r : entry.getValue()) { + if (r == reducer) + return entry.getKey(); + } + } + + throw new IllegalStateException("Not found reducer index: " + reducer); + } + + /** {@inheritDoc} */ + @Override @Nullable public Collection<GridHadoopInputSplit> mappers(UUID nodeId) { + return mappers.get(nodeId); + } + + /** {@inheritDoc} */ + @Override @Nullable public int[] reducers(UUID nodeId) { + return reducers.get(nodeId); + } + + /** {@inheritDoc} */ + @Override public Collection<UUID> mapperNodeIds() { + return mappers.keySet(); + } + + /** {@inheritDoc} */ + @Override public Collection<UUID> reducerNodeIds() { + return reducers.keySet(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java new file mode 100644 index 0000000..01a7471 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.planner; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.igfs.hadoop.*; +import org.apache.ignite.internal.processors.igfs.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.resources.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +import static org.apache.ignite.IgniteFs.*; + +/** + * Default map-reduce planner implementation. + */ +public class HadoopDefaultMapReducePlanner implements GridHadoopMapReducePlanner { + /** Injected grid. */ + @IgniteInstanceResource + private Ignite ignite; + + /** Logger. */ + @SuppressWarnings("UnusedDeclaration") + @LoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public GridHadoopMapReducePlan preparePlan(GridHadoopJob job, Collection<ClusterNode> top, + @Nullable GridHadoopMapReducePlan oldPlan) throws IgniteCheckedException { + // Convert collection of topology nodes to collection of topology node IDs. + Collection<UUID> topIds = new HashSet<>(top.size(), 1.0f); + + for (ClusterNode topNode : top) + topIds.add(topNode.id()); + + Map<UUID, Collection<GridHadoopInputSplit>> mappers = mappers(top, topIds, job.input()); + + int rdcCnt = job.info().reducers(); + + if (rdcCnt < 0) + throw new IgniteCheckedException("Number of reducers must be non-negative, actual: " + rdcCnt); + + Map<UUID, int[]> reducers = reducers(top, mappers, rdcCnt); + + return new HadoopDefaultMapReducePlan(mappers, reducers); + } + + /** + * Create plan for mappers. + * + * @param top Topology nodes. + * @param topIds Topology node IDs. + * @param splits Splits. + * @return Mappers map. + * @throws IgniteCheckedException If failed. + */ + private Map<UUID, Collection<GridHadoopInputSplit>> mappers(Collection<ClusterNode> top, Collection<UUID> topIds, + Iterable<GridHadoopInputSplit> splits) throws IgniteCheckedException { + Map<UUID, Collection<GridHadoopInputSplit>> mappers = new HashMap<>(); + + Map<String, Collection<UUID>> nodes = hosts(top); + + Map<UUID, Integer> nodeLoads = new HashMap<>(top.size(), 1.0f); // Track node load. + + for (UUID nodeId : topIds) + nodeLoads.put(nodeId, 0); + + for (GridHadoopInputSplit split : splits) { + UUID nodeId = nodeForSplit(split, topIds, nodes, nodeLoads); + + if (log.isDebugEnabled()) + log.debug("Mapped split to node [split=" + split + ", nodeId=" + nodeId + ']'); + + Collection<GridHadoopInputSplit> nodeSplits = mappers.get(nodeId); + + if (nodeSplits == null) { + nodeSplits = new ArrayList<>(); + + mappers.put(nodeId, nodeSplits); + } + + nodeSplits.add(split); + + // Updated node load. + nodeLoads.put(nodeId, nodeLoads.get(nodeId) + 1); + } + + return mappers; + } + + /** + * Groups nodes by host names. + * + * @param top Topology to group. + * @return Map. + */ + private static Map<String, Collection<UUID>> hosts(Collection<ClusterNode> top) { + Map<String, Collection<UUID>> grouped = U.newHashMap(top.size()); + + for (ClusterNode node : top) { + for (String host : node.hostNames()) { + Collection<UUID> nodeIds = grouped.get(host); + + if (nodeIds == null) { + // Expecting 1-2 nodes per host. + nodeIds = new ArrayList<>(2); + + grouped.put(host, nodeIds); + } + + nodeIds.add(node.id()); + } + } + + return grouped; + } + + /** + * Determine the best node for this split. + * + * @param split Split. + * @param topIds Topology node IDs. + * @param nodes Nodes. + * @param nodeLoads Node load tracker. + * @return Node ID. + */ + @SuppressWarnings("unchecked") + private UUID nodeForSplit(GridHadoopInputSplit split, Collection<UUID> topIds, Map<String, Collection<UUID>> nodes, + Map<UUID, Integer> nodeLoads) throws IgniteCheckedException { + if (split instanceof GridHadoopFileBlock) { + GridHadoopFileBlock split0 = (GridHadoopFileBlock)split; + + if (IGFS_SCHEME.equalsIgnoreCase(split0.file().getScheme())) { + IgfsHadoopEndpoint endpoint = new IgfsHadoopEndpoint(split0.file().getAuthority()); + + IgfsEx igfs = null; + + if (F.eq(ignite.name(), endpoint.grid())) + igfs = (IgfsEx)((IgniteEx)ignite).igfsx(endpoint.igfs()); + + if (igfs != null && !igfs.isProxy(split0.file())) { + Collection<IgfsBlockLocation> blocks; + + try { + blocks = igfs.affinity(new IgfsPath(split0.file()), split0.start(), split0.length()); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } + + assert blocks != null; + + if (blocks.size() == 1) + // Fast-path, split consists of one IGFS block (as in most cases). + return bestNode(blocks.iterator().next().nodeIds(), topIds, nodeLoads, false); + else { + // Slow-path, file consists of multiple IGFS blocks. First, find the most co-located nodes. + Map<UUID, Long> nodeMap = new HashMap<>(); + + List<UUID> bestNodeIds = null; + long bestLen = -1L; + + for (IgfsBlockLocation block : blocks) { + for (UUID blockNodeId : block.nodeIds()) { + if (topIds.contains(blockNodeId)) { + Long oldLen = nodeMap.get(blockNodeId); + long newLen = oldLen == null ? block.length() : oldLen + block.length(); + + nodeMap.put(blockNodeId, newLen); + + if (bestNodeIds == null || bestLen < newLen) { + bestNodeIds = new ArrayList<>(1); + + bestNodeIds.add(blockNodeId); + + bestLen = newLen; + } + else if (bestLen == newLen) { + assert !F.isEmpty(bestNodeIds); + + bestNodeIds.add(blockNodeId); + } + } + } + } + + if (bestNodeIds != null) { + return bestNodeIds.size() == 1 ? bestNodeIds.get(0) : + bestNode(bestNodeIds, topIds, nodeLoads, true); + } + } + } + } + } + + // Cannot use local IGFS for some reason, try selecting the node by host. + Collection<UUID> blockNodes = null; + + for (String host : split.hosts()) { + Collection<UUID> hostNodes = nodes.get(host); + + if (!F.isEmpty(hostNodes)) { + if (blockNodes == null) + blockNodes = new ArrayList<>(hostNodes); + else + blockNodes.addAll(hostNodes); + } + } + + return bestNode(blockNodes, topIds, nodeLoads, false); + } + + /** + * Finds the best (the least loaded) node among the candidates. + * + * @param candidates Candidates. + * @param topIds Topology node IDs. + * @param nodeLoads Known node loads. + * @param skipTopCheck Whether to skip topology check. + * @return The best node. + */ + private UUID bestNode(@Nullable Collection<UUID> candidates, Collection<UUID> topIds, Map<UUID, Integer> nodeLoads, + boolean skipTopCheck) { + UUID bestNode = null; + int bestLoad = Integer.MAX_VALUE; + + if (candidates != null) { + for (UUID candidate : candidates) { + if (skipTopCheck || topIds.contains(candidate)) { + int load = nodeLoads.get(candidate); + + if (bestNode == null || bestLoad > load) { + bestNode = candidate; + bestLoad = load; + + if (bestLoad == 0) + break; // Minimum load possible, no need for further iterations. + } + } + } + } + + if (bestNode == null) { + // Blocks are located on nodes which are not Hadoop-enabled, assign to the least loaded one. + bestLoad = Integer.MAX_VALUE; + + for (UUID nodeId : topIds) { + int load = nodeLoads.get(nodeId); + + if (bestNode == null || bestLoad > load) { + bestNode = nodeId; + bestLoad = load; + + if (bestLoad == 0) + break; // Minimum load possible, no need for further iterations. + } + } + } + + assert bestNode != null; + + return bestNode; + } + + /** + * Create plan for reducers. + * + * @param top Topology. + * @param mappers Mappers map. + * @param reducerCnt Reducers count. + * @return Reducers map. + */ + private Map<UUID, int[]> reducers(Collection<ClusterNode> top, + Map<UUID, Collection<GridHadoopInputSplit>> mappers, int reducerCnt) { + // Determine initial node weights. + int totalWeight = 0; + + List<WeightedNode> nodes = new ArrayList<>(top.size()); + + for (ClusterNode node : top) { + Collection<GridHadoopInputSplit> split = mappers.get(node.id()); + + int weight = reducerNodeWeight(node, split != null ? split.size() : 0); + + nodes.add(new WeightedNode(node.id(), weight, weight)); + + totalWeight += weight; + } + + // Adjust weights. + int totalAdjustedWeight = 0; + + for (WeightedNode node : nodes) { + node.floatWeight = ((float)node.weight * reducerCnt) / totalWeight; + + node.weight = Math.round(node.floatWeight); + + totalAdjustedWeight += node.weight; + } + + // Apply redundant/lost reducers. + Collections.sort(nodes); + + if (totalAdjustedWeight > reducerCnt) { + // Too much reducers set. + ListIterator<WeightedNode> iter = nodes.listIterator(nodes.size() - 1); + + while (totalAdjustedWeight != reducerCnt) { + if (!iter.hasPrevious()) + iter = nodes.listIterator(nodes.size() - 1); + + WeightedNode node = iter.previous(); + + if (node.weight > 0) { + node.weight -= 1; + + totalAdjustedWeight--; + } + } + } + else if (totalAdjustedWeight < reducerCnt) { + // Not enough reducers set. + ListIterator<WeightedNode> iter = nodes.listIterator(0); + + while (totalAdjustedWeight != reducerCnt) { + if (!iter.hasNext()) + iter = nodes.listIterator(0); + + WeightedNode node = iter.next(); + + if (node.floatWeight > 0.0f) { + node.weight += 1; + + totalAdjustedWeight++; + } + } + } + + int idx = 0; + + Map<UUID, int[]> reducers = new HashMap<>(nodes.size(), 1.0f); + + for (WeightedNode node : nodes) { + if (node.weight > 0) { + int[] arr = new int[node.weight]; + + for (int i = 0; i < arr.length; i++) + arr[i] = idx++; + + reducers.put(node.nodeId, arr); + } + } + + return reducers; + } + + /** + * Calculate node weight based on node metrics and data co-location. + * + * @param node Node. + * @param splitCnt Splits mapped to this node. + * @return Node weight. + */ + @SuppressWarnings("UnusedParameters") + protected int reducerNodeWeight(ClusterNode node, int splitCnt) { + return splitCnt; + } + + /** + * Weighted node. + */ + private static class WeightedNode implements Comparable<WeightedNode> { + /** Node ID. */ + private final UUID nodeId; + + /** Weight. */ + private int weight; + + /** Floating point weight. */ + private float floatWeight; + + /** + * Constructor. + * + * @param nodeId Node ID. + * @param weight Weight. + * @param floatWeight Floating point weight. + */ + private WeightedNode(UUID nodeId, int weight, float floatWeight) { + this.nodeId = nodeId; + this.weight = weight; + this.floatWeight = floatWeight; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return obj != null && obj instanceof WeightedNode && F.eq(nodeId, ((WeightedNode)obj).nodeId); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return nodeId.hashCode(); + } + + /** {@inheritDoc} */ + @Override public int compareTo(@NotNull WeightedNode other) { + float res = other.floatWeight - floatWeight; + + return res > 0.0f ? 1 : res < 0.0f ? -1 : nodeId.compareTo(other.nodeId); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolJobCountersTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolJobCountersTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolJobCountersTask.java deleted file mode 100644 index 37073d9..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolJobCountersTask.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.proto; - -import org.apache.ignite.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.internal.processors.hadoop.*; - -import java.util.*; - -/** - * Task to get job counters. - */ -public class GridHadoopProtocolJobCountersTask extends GridHadoopProtocolTaskAdapter<GridHadoopCounters> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public GridHadoopCounters run(ComputeJobContext jobCtx, GridHadoop hadoop, - GridHadoopProtocolTaskArguments args) throws IgniteCheckedException { - - UUID nodeId = UUID.fromString(args.<String>get(0)); - Integer id = args.get(1); - - assert nodeId != null; - assert id != null; - - return hadoop.counters(new GridHadoopJobId(nodeId, id)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolJobStatusTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolJobStatusTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolJobStatusTask.java deleted file mode 100644 index de4f89c..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolJobStatusTask.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.proto; - -import org.apache.ignite.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.lang.*; - -import java.util.*; - -/** - * Job status task. - */ -public class GridHadoopProtocolJobStatusTask extends GridHadoopProtocolTaskAdapter<GridHadoopJobStatus> { - /** */ - private static final long serialVersionUID = 0L; - - /** Default poll delay */ - private static final long DFLT_POLL_DELAY = 100L; - - /** Attribute for held status. */ - private static final String ATTR_HELD = "held"; - - /** {@inheritDoc} */ - @Override public GridHadoopJobStatus run(final ComputeJobContext jobCtx, GridHadoop hadoop, - GridHadoopProtocolTaskArguments args) throws IgniteCheckedException { - UUID nodeId = UUID.fromString(args.<String>get(0)); - Integer id = args.get(1); - Long pollDelay = args.get(2); - - assert nodeId != null; - assert id != null; - - GridHadoopJobId jobId = new GridHadoopJobId(nodeId, id); - - if (pollDelay == null) - pollDelay = DFLT_POLL_DELAY; - - if (pollDelay > 0) { - IgniteInternalFuture<?> fut = hadoop.finishFuture(jobId); - - if (fut != null) { - if (fut.isDone() || F.eq(jobCtx.getAttribute(ATTR_HELD), true)) - return hadoop.status(jobId); - else { - fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> fut0) { - jobCtx.callcc(); - } - }); - - jobCtx.setAttribute(ATTR_HELD, true); - - return jobCtx.holdcc(pollDelay); - } - } - else - return null; - } - else - return hadoop.status(jobId); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolKillJobTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolKillJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolKillJobTask.java deleted file mode 100644 index 384bc23..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolKillJobTask.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.proto; - -import org.apache.ignite.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.internal.processors.hadoop.*; - -import java.util.*; - -/** - * Kill job task. - */ -public class GridHadoopProtocolKillJobTask extends GridHadoopProtocolTaskAdapter<Boolean> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public Boolean run(ComputeJobContext jobCtx, GridHadoop hadoop, - GridHadoopProtocolTaskArguments args) throws IgniteCheckedException { - UUID nodeId = UUID.fromString(args.<String>get(0)); - Integer id = args.get(1); - - assert nodeId != null; - assert id != null; - - GridHadoopJobId jobId = new GridHadoopJobId(nodeId, id); - - return hadoop.kill(jobId); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolNextTaskIdTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolNextTaskIdTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolNextTaskIdTask.java deleted file mode 100644 index f76f3b6..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolNextTaskIdTask.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.proto; - -import org.apache.ignite.compute.*; -import org.apache.ignite.internal.processors.hadoop.*; - -/** - * Task to get the next job ID. - */ -public class GridHadoopProtocolNextTaskIdTask extends GridHadoopProtocolTaskAdapter<GridHadoopJobId> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public GridHadoopJobId run(ComputeJobContext jobCtx, GridHadoop hadoop, - GridHadoopProtocolTaskArguments args) { - return hadoop.nextJobId(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java deleted file mode 100644 index 8fdab9d..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.proto; - -import org.apache.ignite.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.internal.processors.hadoop.*; - -import java.util.*; - -import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobPhase.*; - -/** - * Submit job task. - */ -public class GridHadoopProtocolSubmitJobTask extends GridHadoopProtocolTaskAdapter<GridHadoopJobStatus> { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public GridHadoopJobStatus run(ComputeJobContext jobCtx, GridHadoop hadoop, - GridHadoopProtocolTaskArguments args) throws IgniteCheckedException { - UUID nodeId = UUID.fromString(args.<String>get(0)); - Integer id = args.get(1); - HadoopDefaultJobInfo info = args.get(2); - - assert nodeId != null; - assert id != null; - assert info != null; - - GridHadoopJobId jobId = new GridHadoopJobId(nodeId, id); - - hadoop.submit(jobId, info); - - GridHadoopJobStatus res = hadoop.status(jobId); - - if (res == null) // Submission failed. - res = new GridHadoopJobStatus(jobId, info.jobName(), info.user(), 0, 0, 0, 0, PHASE_CANCELLING, true, 1); - - return res; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskAdapter.java deleted file mode 100644 index 086545c..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskAdapter.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.proto; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.resources.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Hadoop protocol task adapter. - */ -public abstract class GridHadoopProtocolTaskAdapter<R> implements ComputeTask<GridHadoopProtocolTaskArguments, R> { - /** {@inheritDoc} */ - @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, - @Nullable GridHadoopProtocolTaskArguments arg) { - return Collections.singletonMap(new Job(arg), subgrid.get(0)); - } - - /** {@inheritDoc} */ - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { - return ComputeJobResultPolicy.REDUCE; - } - - /** {@inheritDoc} */ - @Nullable @Override public R reduce(List<ComputeJobResult> results) { - if (!F.isEmpty(results)) { - ComputeJobResult res = results.get(0); - - return res.getData(); - } - else - return null; - } - - /** - * Job wrapper. - */ - private class Job implements ComputeJob { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - @IgniteInstanceResource - private Ignite ignite; - - /** */ - @SuppressWarnings("UnusedDeclaration") - @JobContextResource - private ComputeJobContext jobCtx; - - /** Argument. */ - private final GridHadoopProtocolTaskArguments args; - - /** - * Constructor. - * - * @param args Job argument. - */ - private Job(GridHadoopProtocolTaskArguments args) { - this.args = args; - } - - /** {@inheritDoc} */ - @Override public void cancel() { - // No-op. - } - - /** {@inheritDoc} */ - @Nullable @Override public Object execute() { - try { - return run(jobCtx, ((IgniteEx)ignite).hadoop(), args); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - } - - /** - * Run the task. - * - * @param jobCtx Job context. - * @param hadoop Hadoop facade. - * @param args Arguments. - * @return Job result. - * @throws IgniteCheckedException If failed. - */ - public abstract R run(ComputeJobContext jobCtx, GridHadoop hadoop, GridHadoopProtocolTaskArguments args) - throws IgniteCheckedException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskArguments.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskArguments.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskArguments.java deleted file mode 100644 index ae91a52..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskArguments.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.proto; - -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * Task arguments. - */ -public class GridHadoopProtocolTaskArguments implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Arguments. */ - private Object[] args; - - /** - * {@link Externalizable} support. - */ - public GridHadoopProtocolTaskArguments() { - // No-op. - } - - /** - * Constructor. - * - * @param args Arguments. - */ - public GridHadoopProtocolTaskArguments(Object... args) { - this.args = args; - } - - /** - * @param idx Argument index. - * @return Argument. - */ - @SuppressWarnings("unchecked") - @Nullable public <T> T get(int idx) { - return (args != null && args.length > idx) ? (T)args[idx] : null; - } - - /** - * @return Size. - */ - public int size() { - return args != null ? args.length : 0; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeArray(out, args); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - args = U.readArray(in); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridHadoopProtocolTaskArguments.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java index 66fb230..3a766c3 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java @@ -82,7 +82,7 @@ public class HadoopClientProtocol implements ClientProtocol { try { conf.setLong(REQ_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis()); - GridHadoopJobId jobID = cli.compute().execute(GridHadoopProtocolNextTaskIdTask.class.getName(), null); + GridHadoopJobId jobID = cli.compute().execute(HadoopProtocolNextTaskIdTask.class.getName(), null); conf.setLong(RESPONSE_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis()); @@ -99,8 +99,8 @@ public class HadoopClientProtocol implements ClientProtocol { try { conf.setLong(JOB_SUBMISSION_START_TS_PROPERTY, U.currentTimeMillis()); - GridHadoopJobStatus status = cli.compute().execute(GridHadoopProtocolSubmitJobTask.class.getName(), - new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf))); + GridHadoopJobStatus status = cli.compute().execute(HadoopProtocolSubmitJobTask.class.getName(), + new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf))); if (status == null) throw new IOException("Failed to submit job (null status obtained): " + jobId); @@ -135,8 +135,8 @@ public class HadoopClientProtocol implements ClientProtocol { /** {@inheritDoc} */ @Override public void killJob(JobID jobId) throws IOException, InterruptedException { try { - cli.compute().execute(GridHadoopProtocolKillJobTask.class.getName(), - new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId())); + cli.compute().execute(HadoopProtocolKillJobTask.class.getName(), + new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId())); } catch (GridClientException e) { throw new IOException("Failed to kill job: " + jobId, e); @@ -159,11 +159,11 @@ public class HadoopClientProtocol implements ClientProtocol { try { Long delay = conf.getLong(GridHadoopJobProperty.JOB_STATUS_POLL_DELAY.propertyName(), -1); - GridHadoopProtocolTaskArguments args = delay >= 0 ? - new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), delay) : - new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()); + HadoopProtocolTaskArguments args = delay >= 0 ? + new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), delay) : + new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()); - GridHadoopJobStatus status = cli.compute().execute(GridHadoopProtocolJobStatusTask.class.getName(), args); + GridHadoopJobStatus status = cli.compute().execute(HadoopProtocolJobStatusTask.class.getName(), args); if (status == null) throw new IOException("Job tracker doesn't have any information about the job: " + jobId); @@ -178,8 +178,8 @@ public class HadoopClientProtocol implements ClientProtocol { /** {@inheritDoc} */ @Override public Counters getJobCounters(JobID jobId) throws IOException, InterruptedException { try { - final GridHadoopCounters counters = cli.compute().execute(GridHadoopProtocolJobCountersTask.class.getName(), - new GridHadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId())); + final GridHadoopCounters counters = cli.compute().execute(HadoopProtocolJobCountersTask.class.getName(), + new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId())); if (counters == null) throw new IOException("Job tracker doesn't have any information about the job: " + jobId); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java new file mode 100644 index 0000000..56da194 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.proto; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.internal.processors.hadoop.*; + +import java.util.*; + +/** + * Task to get job counters. + */ +public class HadoopProtocolJobCountersTask extends HadoopProtocolTaskAdapter<GridHadoopCounters> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public GridHadoopCounters run(ComputeJobContext jobCtx, GridHadoop hadoop, + HadoopProtocolTaskArguments args) throws IgniteCheckedException { + + UUID nodeId = UUID.fromString(args.<String>get(0)); + Integer id = args.get(1); + + assert nodeId != null; + assert id != null; + + return hadoop.counters(new GridHadoopJobId(nodeId, id)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java new file mode 100644 index 0000000..ac70c44 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.proto; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; + +import java.util.*; + +/** + * Job status task. + */ +public class HadoopProtocolJobStatusTask extends HadoopProtocolTaskAdapter<GridHadoopJobStatus> { + /** */ + private static final long serialVersionUID = 0L; + + /** Default poll delay */ + private static final long DFLT_POLL_DELAY = 100L; + + /** Attribute for held status. */ + private static final String ATTR_HELD = "held"; + + /** {@inheritDoc} */ + @Override public GridHadoopJobStatus run(final ComputeJobContext jobCtx, GridHadoop hadoop, + HadoopProtocolTaskArguments args) throws IgniteCheckedException { + UUID nodeId = UUID.fromString(args.<String>get(0)); + Integer id = args.get(1); + Long pollDelay = args.get(2); + + assert nodeId != null; + assert id != null; + + GridHadoopJobId jobId = new GridHadoopJobId(nodeId, id); + + if (pollDelay == null) + pollDelay = DFLT_POLL_DELAY; + + if (pollDelay > 0) { + IgniteInternalFuture<?> fut = hadoop.finishFuture(jobId); + + if (fut != null) { + if (fut.isDone() || F.eq(jobCtx.getAttribute(ATTR_HELD), true)) + return hadoop.status(jobId); + else { + fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut0) { + jobCtx.callcc(); + } + }); + + jobCtx.setAttribute(ATTR_HELD, true); + + return jobCtx.holdcc(pollDelay); + } + } + else + return null; + } + else + return hadoop.status(jobId); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java new file mode 100644 index 0000000..8522ab0 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.proto; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.internal.processors.hadoop.*; + +import java.util.*; + +/** + * Kill job task. + */ +public class HadoopProtocolKillJobTask extends HadoopProtocolTaskAdapter<Boolean> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public Boolean run(ComputeJobContext jobCtx, GridHadoop hadoop, + HadoopProtocolTaskArguments args) throws IgniteCheckedException { + UUID nodeId = UUID.fromString(args.<String>get(0)); + Integer id = args.get(1); + + assert nodeId != null; + assert id != null; + + GridHadoopJobId jobId = new GridHadoopJobId(nodeId, id); + + return hadoop.kill(jobId); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java new file mode 100644 index 0000000..357e12d --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.proto; + +import org.apache.ignite.compute.*; +import org.apache.ignite.internal.processors.hadoop.*; + +/** + * Task to get the next job ID. + */ +public class HadoopProtocolNextTaskIdTask extends HadoopProtocolTaskAdapter<GridHadoopJobId> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public GridHadoopJobId run(ComputeJobContext jobCtx, GridHadoop hadoop, + HadoopProtocolTaskArguments args) { + return hadoop.nextJobId(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java new file mode 100644 index 0000000..df03c79 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.proto; + +import org.apache.ignite.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.internal.processors.hadoop.*; + +import java.util.*; + +import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobPhase.*; + +/** + * Submit job task. + */ +public class HadoopProtocolSubmitJobTask extends HadoopProtocolTaskAdapter<GridHadoopJobStatus> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public GridHadoopJobStatus run(ComputeJobContext jobCtx, GridHadoop hadoop, + HadoopProtocolTaskArguments args) throws IgniteCheckedException { + UUID nodeId = UUID.fromString(args.<String>get(0)); + Integer id = args.get(1); + HadoopDefaultJobInfo info = args.get(2); + + assert nodeId != null; + assert id != null; + assert info != null; + + GridHadoopJobId jobId = new GridHadoopJobId(nodeId, id); + + hadoop.submit(jobId, info); + + GridHadoopJobStatus res = hadoop.status(jobId); + + if (res == null) // Submission failed. + res = new GridHadoopJobStatus(jobId, info.jobName(), info.user(), 0, 0, 0, 0, PHASE_CANCELLING, true, 1); + + return res; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java new file mode 100644 index 0000000..6938d1c --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.proto; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.resources.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Hadoop protocol task adapter. + */ +public abstract class HadoopProtocolTaskAdapter<R> implements ComputeTask<HadoopProtocolTaskArguments, R> { + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable HadoopProtocolTaskArguments arg) { + return Collections.singletonMap(new Job(arg), subgrid.get(0)); + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + return ComputeJobResultPolicy.REDUCE; + } + + /** {@inheritDoc} */ + @Nullable @Override public R reduce(List<ComputeJobResult> results) { + if (!F.isEmpty(results)) { + ComputeJobResult res = results.get(0); + + return res.getData(); + } + else + return null; + } + + /** + * Job wrapper. + */ + private class Job implements ComputeJob { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @SuppressWarnings("UnusedDeclaration") + @JobContextResource + private ComputeJobContext jobCtx; + + /** Argument. */ + private final HadoopProtocolTaskArguments args; + + /** + * Constructor. + * + * @param args Job argument. + */ + private Job(HadoopProtocolTaskArguments args) { + this.args = args; + } + + /** {@inheritDoc} */ + @Override public void cancel() { + // No-op. + } + + /** {@inheritDoc} */ + @Nullable @Override public Object execute() { + try { + return run(jobCtx, ((IgniteEx)ignite).hadoop(), args); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + } + + /** + * Run the task. + * + * @param jobCtx Job context. + * @param hadoop Hadoop facade. + * @param args Arguments. + * @return Job result. + * @throws IgniteCheckedException If failed. + */ + public abstract R run(ComputeJobContext jobCtx, GridHadoop hadoop, HadoopProtocolTaskArguments args) + throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java new file mode 100644 index 0000000..5c470ba --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.proto; + +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Task arguments. + */ +public class HadoopProtocolTaskArguments implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Arguments. */ + private Object[] args; + + /** + * {@link Externalizable} support. + */ + public HadoopProtocolTaskArguments() { + // No-op. + } + + /** + * Constructor. + * + * @param args Arguments. + */ + public HadoopProtocolTaskArguments(Object... args) { + this.args = args; + } + + /** + * @param idx Argument index. + * @return Argument. + */ + @SuppressWarnings("unchecked") + @Nullable public <T> T get(int idx) { + return (args != null && args.length > idx) ? (T)args[idx] : null; + } + + /** + * @return Size. + */ + public int size() { + return args != null ? args.length : 0; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeArray(out, args); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + args = U.readArray(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopProtocolTaskArguments.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleAck.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleAck.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleAck.java deleted file mode 100644 index a8a52a9..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleAck.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle; - -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.message.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; - -/** - * Acknowledgement message. - */ -public class GridHadoopShuffleAck implements GridHadoopMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - @GridToStringInclude - private long msgId; - - /** */ - @GridToStringInclude - private GridHadoopJobId jobId; - - /** - * - */ - public GridHadoopShuffleAck() { - // No-op. - } - - /** - * @param msgId Message ID. - */ - public GridHadoopShuffleAck(long msgId, GridHadoopJobId jobId) { - assert jobId != null; - - this.msgId = msgId; - this.jobId = jobId; - } - - /** - * @return Message ID. - */ - public long id() { - return msgId; - } - - /** - * @return Job ID. - */ - public GridHadoopJobId jobId() { - return jobId; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - jobId.writeExternal(out); - out.writeLong(msgId); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - jobId = new GridHadoopJobId(); - - jobId.readExternal(in); - msgId = in.readLong(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridHadoopShuffleAck.class, this); - } -}