http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e2a08b9/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java deleted file mode 100644 index d82bae7..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopDefaultMapReducePlanner.java +++ /dev/null @@ -1,434 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.planner; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.hadoop.igfs.*; -import org.apache.ignite.internal.processors.igfs.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.resources.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -import static org.apache.ignite.IgniteFileSystem.*; - -/** - * Default map-reduce planner implementation. - */ -public class HadoopDefaultMapReducePlanner implements HadoopMapReducePlanner { - /** Injected grid. */ - @IgniteInstanceResource - private Ignite ignite; - - /** Logger. */ - @SuppressWarnings("UnusedDeclaration") - @LoggerResource - private IgniteLogger log; - - /** {@inheritDoc} */ - @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top, - @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException { - // Convert collection of topology nodes to collection of topology node IDs. - Collection<UUID> topIds = new HashSet<>(top.size(), 1.0f); - - for (ClusterNode topNode : top) - topIds.add(topNode.id()); - - Map<UUID, Collection<HadoopInputSplit>> mappers = mappers(top, topIds, job.input()); - - int rdcCnt = job.info().reducers(); - - if (rdcCnt < 0) - throw new IgniteCheckedException("Number of reducers must be non-negative, actual: " + rdcCnt); - - Map<UUID, int[]> reducers = reducers(top, mappers, rdcCnt); - - return new HadoopDefaultMapReducePlan(mappers, reducers); - } - - /** - * Create plan for mappers. - * - * @param top Topology nodes. - * @param topIds Topology node IDs. - * @param splits Splits. - * @return Mappers map. - * @throws IgniteCheckedException If failed. - */ - private Map<UUID, Collection<HadoopInputSplit>> mappers(Collection<ClusterNode> top, Collection<UUID> topIds, - Iterable<HadoopInputSplit> splits) throws IgniteCheckedException { - Map<UUID, Collection<HadoopInputSplit>> mappers = new HashMap<>(); - - Map<String, Collection<UUID>> nodes = hosts(top); - - Map<UUID, Integer> nodeLoads = new HashMap<>(top.size(), 1.0f); // Track node load. - - for (UUID nodeId : topIds) - nodeLoads.put(nodeId, 0); - - for (HadoopInputSplit split : splits) { - UUID nodeId = nodeForSplit(split, topIds, nodes, nodeLoads); - - if (log.isDebugEnabled()) - log.debug("Mapped split to node [split=" + split + ", nodeId=" + nodeId + ']'); - - Collection<HadoopInputSplit> nodeSplits = mappers.get(nodeId); - - if (nodeSplits == null) { - nodeSplits = new ArrayList<>(); - - mappers.put(nodeId, nodeSplits); - } - - nodeSplits.add(split); - - // Updated node load. - nodeLoads.put(nodeId, nodeLoads.get(nodeId) + 1); - } - - return mappers; - } - - /** - * Groups nodes by host names. - * - * @param top Topology to group. - * @return Map. - */ - private static Map<String, Collection<UUID>> hosts(Collection<ClusterNode> top) { - Map<String, Collection<UUID>> grouped = U.newHashMap(top.size()); - - for (ClusterNode node : top) { - for (String host : node.hostNames()) { - Collection<UUID> nodeIds = grouped.get(host); - - if (nodeIds == null) { - // Expecting 1-2 nodes per host. - nodeIds = new ArrayList<>(2); - - grouped.put(host, nodeIds); - } - - nodeIds.add(node.id()); - } - } - - return grouped; - } - - /** - * Determine the best node for this split. - * - * @param split Split. - * @param topIds Topology node IDs. - * @param nodes Nodes. - * @param nodeLoads Node load tracker. - * @return Node ID. - */ - @SuppressWarnings("unchecked") - private UUID nodeForSplit(HadoopInputSplit split, Collection<UUID> topIds, Map<String, Collection<UUID>> nodes, - Map<UUID, Integer> nodeLoads) throws IgniteCheckedException { - if (split instanceof HadoopFileBlock) { - HadoopFileBlock split0 = (HadoopFileBlock)split; - - if (IGFS_SCHEME.equalsIgnoreCase(split0.file().getScheme())) { - HadoopIgfsEndpoint endpoint = new HadoopIgfsEndpoint(split0.file().getAuthority()); - - IgfsEx igfs = null; - - if (F.eq(ignite.name(), endpoint.grid())) - igfs = (IgfsEx)((IgniteEx)ignite).igfsx(endpoint.igfs()); - - if (igfs != null && !igfs.isProxy(split0.file())) { - Collection<IgfsBlockLocation> blocks; - - try { - blocks = igfs.affinity(new IgfsPath(split0.file()), split0.start(), split0.length()); - } - catch (IgniteException e) { - throw new IgniteCheckedException(e); - } - - assert blocks != null; - - if (blocks.size() == 1) - // Fast-path, split consists of one IGFS block (as in most cases). - return bestNode(blocks.iterator().next().nodeIds(), topIds, nodeLoads, false); - else { - // Slow-path, file consists of multiple IGFS blocks. First, find the most co-located nodes. - Map<UUID, Long> nodeMap = new HashMap<>(); - - List<UUID> bestNodeIds = null; - long bestLen = -1L; - - for (IgfsBlockLocation block : blocks) { - for (UUID blockNodeId : block.nodeIds()) { - if (topIds.contains(blockNodeId)) { - Long oldLen = nodeMap.get(blockNodeId); - long newLen = oldLen == null ? block.length() : oldLen + block.length(); - - nodeMap.put(blockNodeId, newLen); - - if (bestNodeIds == null || bestLen < newLen) { - bestNodeIds = new ArrayList<>(1); - - bestNodeIds.add(blockNodeId); - - bestLen = newLen; - } - else if (bestLen == newLen) { - assert !F.isEmpty(bestNodeIds); - - bestNodeIds.add(blockNodeId); - } - } - } - } - - if (bestNodeIds != null) { - return bestNodeIds.size() == 1 ? bestNodeIds.get(0) : - bestNode(bestNodeIds, topIds, nodeLoads, true); - } - } - } - } - } - - // Cannot use local IGFS for some reason, try selecting the node by host. - Collection<UUID> blockNodes = null; - - for (String host : split.hosts()) { - Collection<UUID> hostNodes = nodes.get(host); - - if (!F.isEmpty(hostNodes)) { - if (blockNodes == null) - blockNodes = new ArrayList<>(hostNodes); - else - blockNodes.addAll(hostNodes); - } - } - - return bestNode(blockNodes, topIds, nodeLoads, false); - } - - /** - * Finds the best (the least loaded) node among the candidates. - * - * @param candidates Candidates. - * @param topIds Topology node IDs. - * @param nodeLoads Known node loads. - * @param skipTopCheck Whether to skip topology check. - * @return The best node. - */ - private UUID bestNode(@Nullable Collection<UUID> candidates, Collection<UUID> topIds, Map<UUID, Integer> nodeLoads, - boolean skipTopCheck) { - UUID bestNode = null; - int bestLoad = Integer.MAX_VALUE; - - if (candidates != null) { - for (UUID candidate : candidates) { - if (skipTopCheck || topIds.contains(candidate)) { - int load = nodeLoads.get(candidate); - - if (bestNode == null || bestLoad > load) { - bestNode = candidate; - bestLoad = load; - - if (bestLoad == 0) - break; // Minimum load possible, no need for further iterations. - } - } - } - } - - if (bestNode == null) { - // Blocks are located on nodes which are not Hadoop-enabled, assign to the least loaded one. - bestLoad = Integer.MAX_VALUE; - - for (UUID nodeId : topIds) { - int load = nodeLoads.get(nodeId); - - if (bestNode == null || bestLoad > load) { - bestNode = nodeId; - bestLoad = load; - - if (bestLoad == 0) - break; // Minimum load possible, no need for further iterations. - } - } - } - - assert bestNode != null; - - return bestNode; - } - - /** - * Create plan for reducers. - * - * @param top Topology. - * @param mappers Mappers map. - * @param reducerCnt Reducers count. - * @return Reducers map. - */ - private Map<UUID, int[]> reducers(Collection<ClusterNode> top, - Map<UUID, Collection<HadoopInputSplit>> mappers, int reducerCnt) { - // Determine initial node weights. - int totalWeight = 0; - - List<WeightedNode> nodes = new ArrayList<>(top.size()); - - for (ClusterNode node : top) { - Collection<HadoopInputSplit> split = mappers.get(node.id()); - - int weight = reducerNodeWeight(node, split != null ? split.size() : 0); - - nodes.add(new WeightedNode(node.id(), weight, weight)); - - totalWeight += weight; - } - - // Adjust weights. - int totalAdjustedWeight = 0; - - for (WeightedNode node : nodes) { - node.floatWeight = ((float)node.weight * reducerCnt) / totalWeight; - - node.weight = Math.round(node.floatWeight); - - totalAdjustedWeight += node.weight; - } - - // Apply redundant/lost reducers. - Collections.sort(nodes); - - if (totalAdjustedWeight > reducerCnt) { - // Too much reducers set. - ListIterator<WeightedNode> iter = nodes.listIterator(nodes.size() - 1); - - while (totalAdjustedWeight != reducerCnt) { - if (!iter.hasPrevious()) - iter = nodes.listIterator(nodes.size() - 1); - - WeightedNode node = iter.previous(); - - if (node.weight > 0) { - node.weight -= 1; - - totalAdjustedWeight--; - } - } - } - else if (totalAdjustedWeight < reducerCnt) { - // Not enough reducers set. - ListIterator<WeightedNode> iter = nodes.listIterator(0); - - while (totalAdjustedWeight != reducerCnt) { - if (!iter.hasNext()) - iter = nodes.listIterator(0); - - WeightedNode node = iter.next(); - - if (node.floatWeight > 0.0f) { - node.weight += 1; - - totalAdjustedWeight++; - } - } - } - - int idx = 0; - - Map<UUID, int[]> reducers = new HashMap<>(nodes.size(), 1.0f); - - for (WeightedNode node : nodes) { - if (node.weight > 0) { - int[] arr = new int[node.weight]; - - for (int i = 0; i < arr.length; i++) - arr[i] = idx++; - - reducers.put(node.nodeId, arr); - } - } - - return reducers; - } - - /** - * Calculate node weight based on node metrics and data co-location. - * - * @param node Node. - * @param splitCnt Splits mapped to this node. - * @return Node weight. - */ - @SuppressWarnings("UnusedParameters") - protected int reducerNodeWeight(ClusterNode node, int splitCnt) { - return splitCnt; - } - - /** - * Weighted node. - */ - private static class WeightedNode implements Comparable<WeightedNode> { - /** Node ID. */ - private final UUID nodeId; - - /** Weight. */ - private int weight; - - /** Floating point weight. */ - private float floatWeight; - - /** - * Constructor. - * - * @param nodeId Node ID. - * @param weight Weight. - * @param floatWeight Floating point weight. - */ - private WeightedNode(UUID nodeId, int weight, float floatWeight) { - this.nodeId = nodeId; - this.weight = weight; - this.floatWeight = floatWeight; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - return obj != null && obj instanceof WeightedNode && F.eq(nodeId, ((WeightedNode)obj).nodeId); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return nodeId.hashCode(); - } - - /** {@inheritDoc} */ - @Override public int compareTo(@NotNull WeightedNode other) { - float res = other.floatWeight - floatWeight; - - return res > 0.0f ? 1 : res < 0.0f ? -1 : nodeId.compareTo(other.nodeId); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e2a08b9/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java index b7aafff..d907a6c 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java @@ -143,7 +143,7 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA private void startNodes() throws Exception { if (mode != PRIMARY) { // Start secondary IGFS. - IgfsConfiguration igfsCfg = new IgfsConfiguration(); + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); igfsCfg.setDataCacheName("partitioned"); igfsCfg.setMetaCacheName("replicated"); @@ -182,7 +182,7 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA cfg.setDiscoverySpi(discoSpi); cfg.setCacheConfiguration(metaCacheCfg, cacheCfg); - cfg.setIgfsConfiguration(igfsCfg); + cfg.setFileSystemConfiguration(igfsCfg); cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); cfg.setLocalHost(U.getLocalHost().getHostAddress()); cfg.setCommunicationSpi(communicationSpi()); @@ -208,7 +208,7 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA cfg.setDiscoverySpi(discoSpi); cfg.setCacheConfiguration(cacheConfiguration(gridName)); - cfg.setIgfsConfiguration(igfsConfiguration(gridName)); + cfg.setFileSystemConfiguration(igfsConfiguration(gridName)); cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); cfg.setLocalHost("127.0.0.1"); cfg.setCommunicationSpi(communicationSpi()); @@ -251,8 +251,8 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA * @param gridName Grid name. * @return IGFS configuration. */ - protected IgfsConfiguration igfsConfiguration(String gridName) throws IgniteCheckedException { - IgfsConfiguration cfg = new IgfsConfiguration(); + protected FileSystemConfiguration igfsConfiguration(String gridName) throws IgniteCheckedException { + FileSystemConfiguration cfg = new FileSystemConfiguration(); cfg.setDataCacheName("partitioned"); cfg.setMetaCacheName("replicated"); @@ -466,7 +466,7 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA FSDataOutputStream os = fs.create(file, EnumSet.noneOf(CreateFlag.class), Options.CreateOpts.perms(FsPermission.getDefault())); - final int cnt = 5 * IgfsConfiguration.DFLT_BLOCK_SIZE; // Write 5 blocks. + final int cnt = 5 * FileSystemConfiguration.DFLT_BLOCK_SIZE; // Write 5 blocks. for (int i = 0; i < cnt; i++) os.writeInt(i); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e2a08b9/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java index a7fa88e..e89d015 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfsDualAbstractSelfTest.java @@ -125,7 +125,7 @@ public abstract class HadoopIgfsDualAbstractSelfTest extends IgfsCommonAbstractT */ protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode, @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable Map<String, String> restCfg) throws Exception { - IgfsConfiguration igfsCfg = new IgfsConfiguration(); + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); igfsCfg.setDataCacheName("dataCache"); igfsCfg.setMetaCacheName("metaCache"); @@ -167,7 +167,7 @@ public abstract class HadoopIgfsDualAbstractSelfTest extends IgfsCommonAbstractT cfg.setDiscoverySpi(discoSpi); cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg); - cfg.setIgfsConfiguration(igfsCfg); + cfg.setFileSystemConfiguration(igfsCfg); cfg.setLocalHost("127.0.0.1"); cfg.setConnectorConfiguration(null); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e2a08b9/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java index de5aea1..f6f5bae 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java @@ -76,8 +76,8 @@ public class IgfsEventsTestSuite extends TestSuite { */ public static class ShmemPrivate extends IgfsEventsAbstractSelfTest { /** {@inheritDoc} */ - @Override protected IgfsConfiguration getIgfsConfiguration() throws IgniteCheckedException { - IgfsConfiguration igfsCfg = super.getIgfsConfiguration(); + @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { + FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); igfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{ put("type", "shmem"); @@ -93,8 +93,8 @@ public class IgfsEventsTestSuite extends TestSuite { */ public static class LoopbackPrivate extends IgfsEventsAbstractSelfTest { /** {@inheritDoc} */ - @Override protected IgfsConfiguration getIgfsConfiguration() throws IgniteCheckedException { - IgfsConfiguration igfsCfg = super.getIgfsConfiguration(); + @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { + FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); igfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{ put("type", "tcp"); @@ -113,8 +113,8 @@ public class IgfsEventsTestSuite extends TestSuite { private static IgniteFileSystem igfsSec; /** {@inheritDoc} */ - @Override protected IgfsConfiguration getIgfsConfiguration() throws IgniteCheckedException { - IgfsConfiguration igfsCfg = super.getIgfsConfiguration(); + @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { + FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); igfsCfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem( "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/", @@ -126,8 +126,8 @@ public class IgfsEventsTestSuite extends TestSuite { /** * @return IGFS configuration for secondary file system. */ - protected IgfsConfiguration getSecondaryIgfsConfiguration() throws IgniteCheckedException { - IgfsConfiguration igfsCfg = super.getIgfsConfiguration(); + protected FileSystemConfiguration getSecondaryIgfsConfiguration() throws IgniteCheckedException { + FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); igfsCfg.setName("igfs-secondary"); igfsCfg.setDefaultMode(PRIMARY); @@ -184,8 +184,8 @@ public class IgfsEventsTestSuite extends TestSuite { */ public static class ShmemDualSync extends PrimarySecondaryTest { /** {@inheritDoc} */ - @Override protected IgfsConfiguration getIgfsConfiguration() throws IgniteCheckedException { - IgfsConfiguration igfsCfg = super.getIgfsConfiguration(); + @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { + FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); igfsCfg.setDefaultMode(DUAL_SYNC); @@ -198,8 +198,8 @@ public class IgfsEventsTestSuite extends TestSuite { */ public static class ShmemDualAsync extends PrimarySecondaryTest { /** {@inheritDoc} */ - @Override protected IgfsConfiguration getIgfsConfiguration() throws IgniteCheckedException { - IgfsConfiguration igfsCfg = super.getIgfsConfiguration(); + @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { + FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); igfsCfg.setDefaultMode(DUAL_ASYNC); @@ -212,8 +212,8 @@ public class IgfsEventsTestSuite extends TestSuite { */ public abstract static class LoopbackPrimarySecondaryTest extends PrimarySecondaryTest { /** {@inheritDoc} */ - @Override protected IgfsConfiguration getIgfsConfiguration() throws IgniteCheckedException { - IgfsConfiguration igfsCfg = super.getIgfsConfiguration(); + @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { + FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); igfsCfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem( "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/", @@ -223,8 +223,8 @@ public class IgfsEventsTestSuite extends TestSuite { } /** {@inheritDoc} */ - @Override protected IgfsConfiguration getSecondaryIgfsConfiguration() throws IgniteCheckedException { - IgfsConfiguration igfsCfg = super.getSecondaryIgfsConfiguration(); + @Override protected FileSystemConfiguration getSecondaryIgfsConfiguration() throws IgniteCheckedException { + FileSystemConfiguration igfsCfg = super.getSecondaryIgfsConfiguration(); igfsCfg.setName("igfs-secondary"); igfsCfg.setDefaultMode(PRIMARY); @@ -242,8 +242,8 @@ public class IgfsEventsTestSuite extends TestSuite { */ public static class LoopbackDualSync extends LoopbackPrimarySecondaryTest { /** {@inheritDoc} */ - @Override protected IgfsConfiguration getIgfsConfiguration() throws IgniteCheckedException { - IgfsConfiguration igfsCfg = super.getIgfsConfiguration(); + @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { + FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); igfsCfg.setDefaultMode(DUAL_SYNC); @@ -256,8 +256,8 @@ public class IgfsEventsTestSuite extends TestSuite { */ public static class LoopbackDualAsync extends LoopbackPrimarySecondaryTest { /** {@inheritDoc} */ - @Override protected IgfsConfiguration getIgfsConfiguration() throws IgniteCheckedException { - IgfsConfiguration igfsCfg = super.getIgfsConfiguration(); + @Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException { + FileSystemConfiguration igfsCfg = super.getIgfsConfiguration(); igfsCfg.setDefaultMode(DUAL_ASYNC); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e2a08b9/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java index d27d93d..d128731 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java @@ -75,7 +75,7 @@ public class IgfsNearOnlyMultiNodeSelfTest extends GridCommonAbstractTest { cfg.setDiscoverySpi(discoSpi); - IgfsConfiguration igfsCfg = new IgfsConfiguration(); + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); igfsCfg.setDataCacheName("partitioned"); igfsCfg.setMetaCacheName("partitioned"); @@ -88,7 +88,7 @@ public class IgfsNearOnlyMultiNodeSelfTest extends GridCommonAbstractTest { igfsCfg.setBlockSize(512 * 1024); // Together with group blocks mapper will yield 64M per node groups. - cfg.setIgfsConfiguration(igfsCfg); + cfg.setFileSystemConfiguration(igfsCfg); cfg.setCacheConfiguration(cacheConfiguration(gridName)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e2a08b9/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java index ffafd90..be25c61 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java @@ -173,7 +173,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA private void startNodes() throws Exception { if (mode != PRIMARY) { // Start secondary IGFS. - IgfsConfiguration igfsCfg = new IgfsConfiguration(); + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); igfsCfg.setDataCacheName("partitioned"); igfsCfg.setMetaCacheName("replicated"); @@ -211,7 +211,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA cfg.setDiscoverySpi(discoSpi); cfg.setCacheConfiguration(metaCacheCfg, cacheCfg); - cfg.setIgfsConfiguration(igfsCfg); + cfg.setFileSystemConfiguration(igfsCfg); cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); cfg.setCommunicationSpi(communicationSpi()); @@ -277,7 +277,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA cfg.setDiscoverySpi(discoSpi); cfg.setCacheConfiguration(cacheConfiguration(gridName)); - cfg.setIgfsConfiguration(igfsConfiguration(gridName)); + cfg.setFileSystemConfiguration(igfsConfiguration(gridName)); cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); cfg.setCommunicationSpi(communicationSpi()); @@ -319,8 +319,8 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA * @param gridName Grid name. * @return IGFS configuration. */ - protected IgfsConfiguration igfsConfiguration(String gridName) throws IgniteCheckedException { - IgfsConfiguration cfg = new IgfsConfiguration(); + protected FileSystemConfiguration igfsConfiguration(String gridName) throws IgniteCheckedException { + FileSystemConfiguration cfg = new FileSystemConfiguration(); cfg.setDataCacheName("partitioned"); cfg.setMetaCacheName("replicated"); @@ -649,7 +649,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA FSDataOutputStream os = fs.create(file, false, 128); - final int cnt = 5 * IgfsConfiguration.DFLT_BLOCK_SIZE; // Write 5 blocks. + final int cnt = 5 * FileSystemConfiguration.DFLT_BLOCK_SIZE; // Write 5 blocks. for (int i = 0; i < cnt; i++) os.writeInt(i); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e2a08b9/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java index 14f7970..29dd996 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java @@ -66,7 +66,7 @@ public class IgniteHadoopFileSystemClientSelfTest extends IgfsCommonAbstractTest cfg.setDiscoverySpi(discoSpi); - IgfsConfiguration igfsCfg = new IgfsConfiguration(); + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); igfsCfg.setDataCacheName("partitioned"); igfsCfg.setMetaCacheName("replicated"); @@ -78,7 +78,7 @@ public class IgniteHadoopFileSystemClientSelfTest extends IgfsCommonAbstractTest }}); cfg.setCacheConfiguration(cacheConfiguration()); - cfg.setIgfsConfiguration(igfsCfg); + cfg.setFileSystemConfiguration(igfsCfg); return cfg; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e2a08b9/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java index 6208ce0..3b4c5c2 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemHandshakeSelfTest.java @@ -227,7 +227,7 @@ public class IgniteHadoopFileSystemHandshakeSelfTest extends IgfsCommonAbstractT cfg.setCacheConfiguration(metaCacheCfg, dataCacheCfg); - IgfsConfiguration igfsCfg = new IgfsConfiguration(); + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); igfsCfg.setDataCacheName("partitioned"); igfsCfg.setMetaCacheName("replicated"); @@ -242,7 +242,7 @@ public class IgniteHadoopFileSystemHandshakeSelfTest extends IgfsCommonAbstractT igfsCfg.setManagementPort(-1); igfsCfg.setBlockSize(512 * 1024); - cfg.setIgfsConfiguration(igfsCfg); + cfg.setFileSystemConfiguration(igfsCfg); return cfg; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e2a08b9/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java index 9a0726b..135a488 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java @@ -65,12 +65,12 @@ public class IgniteHadoopFileSystemIpcCacheSelfTest extends IgfsCommonAbstractTe cfg.setDiscoverySpi(discoSpi); - IgfsConfiguration igfsCfg = new IgfsConfiguration(); + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); igfsCfg.setDataCacheName("partitioned"); igfsCfg.setMetaCacheName("replicated"); igfsCfg.setName("igfs"); - igfsCfg.setManagementPort(IgfsConfiguration.DFLT_MGMT_PORT + cnt); + igfsCfg.setManagementPort(FileSystemConfiguration.DFLT_MGMT_PORT + cnt); igfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{ put("type", "shmem"); @@ -79,7 +79,7 @@ public class IgniteHadoopFileSystemIpcCacheSelfTest extends IgfsCommonAbstractTe igfsCfg.setBlockSize(512 * 1024); // Together with group blocks mapper will yield 64M per node groups. - cfg.setIgfsConfiguration(igfsCfg); + cfg.setFileSystemConfiguration(igfsCfg); cfg.setCacheConfiguration(cacheConfiguration()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e2a08b9/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoggerStateSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoggerStateSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoggerStateSelfTest.java index fdd2711..1f6a204 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoggerStateSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemLoggerStateSelfTest.java @@ -75,7 +75,7 @@ public class IgniteHadoopFileSystemLoggerStateSelfTest extends IgfsCommonAbstrac * @throws Exception If failed. */ private void startUp() throws Exception { - IgfsConfiguration igfsCfg = new IgfsConfiguration(); + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); igfsCfg.setDataCacheName("partitioned"); igfsCfg.setMetaCacheName("replicated"); @@ -116,7 +116,7 @@ public class IgniteHadoopFileSystemLoggerStateSelfTest extends IgfsCommonAbstrac cfg.setDiscoverySpi(discoSpi); cfg.setCacheConfiguration(metaCacheCfg, cacheCfg); - cfg.setIgfsConfiguration(igfsCfg); + cfg.setFileSystemConfiguration(igfsCfg); cfg.setLocalHost("127.0.0.1"); cfg.setConnectorConfiguration(null); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e2a08b9/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemSecondaryModeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemSecondaryModeSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemSecondaryModeSelfTest.java index e99f7d6..3f20070 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemSecondaryModeSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemSecondaryModeSelfTest.java @@ -83,7 +83,7 @@ public class IgniteHadoopFileSystemSecondaryModeSelfTest extends IgfsCommonAbstr private void startUp() throws Exception { startUpSecondary(); - IgfsConfiguration igfsCfg = new IgfsConfiguration(); + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); igfsCfg.setDataCacheName("partitioned"); igfsCfg.setMetaCacheName("replicated"); @@ -130,7 +130,7 @@ public class IgniteHadoopFileSystemSecondaryModeSelfTest extends IgfsCommonAbstr cfg.setDiscoverySpi(discoSpi); cfg.setCacheConfiguration(metaCacheCfg, cacheCfg); - cfg.setIgfsConfiguration(igfsCfg); + cfg.setFileSystemConfiguration(igfsCfg); cfg.setLocalHost("127.0.0.1"); @@ -151,7 +151,7 @@ public class IgniteHadoopFileSystemSecondaryModeSelfTest extends IgfsCommonAbstr * @throws Exception If failed. */ private void startUpSecondary() throws Exception { - IgfsConfiguration igfsCfg = new IgfsConfiguration(); + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); igfsCfg.setDataCacheName("partitioned"); igfsCfg.setMetaCacheName("replicated"); @@ -192,7 +192,7 @@ public class IgniteHadoopFileSystemSecondaryModeSelfTest extends IgfsCommonAbstr cfg.setDiscoverySpi(discoSpi); cfg.setCacheConfiguration(metaCacheCfg, cacheCfg); - cfg.setIgfsConfiguration(igfsCfg); + cfg.setFileSystemConfiguration(igfsCfg); cfg.setLocalHost("127.0.0.1"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e2a08b9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java index a26ead5..7fda532 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java @@ -107,7 +107,7 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest { if (igfsEnabled()) { cfg.setCacheConfiguration(metaCacheConfiguration(), dataCacheConfiguration()); - cfg.setIgfsConfiguration(igfsConfiguration()); + cfg.setFileSystemConfiguration(igfsConfiguration()); } if (restEnabled()) { @@ -139,8 +139,8 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest { /** * @return IGFS configuration. */ - public IgfsConfiguration igfsConfiguration() { - IgfsConfiguration cfg = new IgfsConfiguration(); + public FileSystemConfiguration igfsConfiguration() { + FileSystemConfiguration cfg = new FileSystemConfiguration(); cfg.setName(igfsName); cfg.setBlockSize(igfsBlockSize); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e2a08b9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java index 318dfb9..76988a3 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.hadoop.mapreduce.*; import org.apache.ignite.igfs.*; import org.apache.ignite.igfs.mapreduce.*; import org.apache.ignite.igfs.secondary.*; @@ -28,7 +29,6 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.igfs.*; -import org.apache.ignite.internal.processors.hadoop.planner.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.lang.*; import org.apache.ignite.testframework.*; @@ -75,7 +75,7 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes private static final IgniteFileSystem IGFS = new MockIgfs(); /** Planner. */ - private static final HadoopMapReducePlanner PLANNER = new HadoopDefaultMapReducePlanner(); + private static final HadoopMapReducePlanner PLANNER = new IgniteHadoopMapReducePlanner(); /** Block locations. */ private static final Map<Block, Collection<IgfsBlockLocation>> BLOCK_MAP = new HashMap<>(); @@ -756,7 +756,7 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes } /** {@inheritDoc} */ - @Override public IgfsConfiguration configuration() { + @Override public FileSystemConfiguration configuration() { return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e2a08b9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java index 6b711eb..20c5db2 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java @@ -72,8 +72,8 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest { /** {@inheritDoc} */ - @Override public IgfsConfiguration igfsConfiguration() { - IgfsConfiguration cfg = super.igfsConfiguration(); + @Override public FileSystemConfiguration igfsConfiguration() { + FileSystemConfiguration cfg = super.igfsConfiguration(); cfg.setFragmentizerEnabled(false); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8e2a08b9/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 1d42eba..9c6ef75 100644 --- a/pom.xml +++ b/pom.xml @@ -731,12 +731,12 @@ <packages>org.apache.ignite.services</packages> </group> <group> - <title>Ignite File System</title> - <packages>org.apache.ignite.igfs</packages> + <title>File System APIs</title> + <packages>org.apache.ignite.igfs:org.apache.ignite.igfs.mapreduce:org.apache.ignite.igfs.mapreduce.records:org.apache.ignite.igfs.secondary</packages> </group> <group> <title>Hadoop Accelerator APIs</title> - <packages>org.apache.ignite.hadoop:org.apache.ignite.hadoop.fs:org.apache.ignite.hadoop.fs.v1:org.apache.ignite.hadoop.fs.v2:org.apache.ignite.hadoop.mapreduce:org.apache.ignite.igfs:org.apache.ignite.igfs.mapreduce:org.apache.ignite.igfs.mapreduce.records:org.apache.ignite.igfs.mapreduce.secondary</packages> + <packages>org.apache.ignite.hadoop:org.apache.ignite.hadoop.fs:org.apache.ignite.hadoop.fs.v1:org.apache.ignite.hadoop.fs.v2:org.apache.ignite.hadoop.mapreduce</packages> </group> <group> <title>Streaming APIs</title> @@ -926,12 +926,12 @@ <packages>org.apache.ignite.services</packages> </group> <group> - <title>Ignite File System</title> - <packages>org.apache.ignite.igfs</packages> + <title>File System APIs</title> + <packages>org.apache.ignite.igfs:org.apache.ignite.igfs.mapreduce:org.apache.ignite.igfs.mapreduce.records:org.apache.ignite.igfs.secondary</packages> </group> <group> <title>Hadoop Accelerator APIs</title> - <packages>org.apache.ignite.hadoop:org.apache.ignite.hadoop.fs:org.apache.ignite.hadoop.fs.v1:org.apache.ignite.hadoop.fs.v2:org.apache.ignite.hadoop.mapreduce:org.apache.ignite.igfs:org.apache.ignite.igfs.mapreduce:org.apache.ignite.igfs.mapreduce.records:org.apache.ignite.igfs.mapreduce.secondary</packages> + <packages>org.apache.ignite.hadoop:org.apache.ignite.hadoop.fs:org.apache.ignite.hadoop.fs.v1:org.apache.ignite.hadoop.fs.v2:org.apache.ignite.hadoop.mapreduce</packages> </group> <group> <title>Streaming APIs</title>