http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java new file mode 100644 index 0000000..2e2b5cb --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java @@ -0,0 +1,1005 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.igfs.mapreduce.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.igfs.*; +import org.apache.ignite.internal.processors.hadoop.planner.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.testframework.*; +import org.jetbrains.annotations.*; + +import java.net.*; +import java.util.*; + +/** + * + */ +public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSelfTest { + /** */ + private static final UUID ID_1 = new UUID(0, 1); + + /** */ + private static final UUID ID_2 = new UUID(0, 2); + + /** */ + private static final UUID ID_3 = new UUID(0, 3); + + /** */ + private static final String HOST_1 = "host1"; + + /** */ + private static final String HOST_2 = "host2"; + + /** */ + private static final String HOST_3 = "host3"; + + /** */ + private static final String INVALID_HOST_1 = "invalid_host1"; + + /** */ + private static final String INVALID_HOST_2 = "invalid_host2"; + + /** */ + private static final String INVALID_HOST_3 = "invalid_host3"; + + /** Mocked Grid. */ + private static final MockIgnite GRID = new MockIgnite(); + + /** Mocked IGFS. */ + private static final IgniteFs IGFS = new MockIgfs(); + + /** Planner. */ + private static final GridHadoopMapReducePlanner PLANNER = new HadoopDefaultMapReducePlanner(); + + /** Block locations. */ + private static final Map<Block, Collection<IgfsBlockLocation>> BLOCK_MAP = new HashMap<>(); + + /** Proxy map. */ + private static final Map<URI, Boolean> PROXY_MAP = new HashMap<>(); + + /** Last created plan. */ + private static final ThreadLocal<GridHadoopMapReducePlan> PLAN = new ThreadLocal<>(); + + /** + * + */ + static { + GridTestUtils.setFieldValue(PLANNER, "ignite", GRID); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + GridTestUtils.setFieldValue(PLANNER, "log", log()); + + BLOCK_MAP.clear(); + PROXY_MAP.clear(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testIgfsOneBlockPerNode() throws IgniteCheckedException { + GridHadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1); + GridHadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_2); + GridHadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_3); + + mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1)); + mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_2)); + mapIgfsBlock(split3.file(), 0, 100, location(0, 100, ID_3)); + + plan(1, split1); + assert ensureMappers(ID_1, split1); + assert ensureReducers(ID_1, 1); + assert ensureEmpty(ID_2); + assert ensureEmpty(ID_3); + + plan(2, split1); + assert ensureMappers(ID_1, split1); + assert ensureReducers(ID_1, 2); + assert ensureEmpty(ID_2); + assert ensureEmpty(ID_3); + + plan(1, split1, split2); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(2, split1, split2); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(3, split1, split2); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) || ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(3, split1, split2, split3); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureMappers(ID_3, split3); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureReducers(ID_3, 1); + + plan(5, split1, split2, split3); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureMappers(ID_3, split3); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testNonIgfsOneBlockPerNode() throws IgniteCheckedException { + GridHadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1); + GridHadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_2); + GridHadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_3); + + plan(1, split1); + assert ensureMappers(ID_1, split1); + assert ensureReducers(ID_1, 1); + assert ensureEmpty(ID_2); + assert ensureEmpty(ID_3); + + plan(2, split1); + assert ensureMappers(ID_1, split1); + assert ensureReducers(ID_1, 2); + assert ensureEmpty(ID_2); + assert ensureEmpty(ID_3); + + plan(1, split1, split2); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(2, split1, split2); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(3, split1, split2); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) || ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(3, split1, split2, split3); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureMappers(ID_3, split3); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureReducers(ID_3, 1); + + plan(5, split1, split2, split3); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureMappers(ID_3, split3); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testIgfsSeveralBlocksPerNode() throws IgniteCheckedException { + GridHadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2); + GridHadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2); + GridHadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_1, HOST_3); + + mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1, ID_2)); + mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_1, ID_2)); + mapIgfsBlock(split3.file(), 0, 100, location(0, 100, ID_1, ID_3)); + + plan(1, split1); + assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) || + ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(2, split1); + assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) || + ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2); + assert ensureEmpty(ID_3); + + plan(1, split1, split2); + assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(2, split1, split2); + assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(3, split1, split2, split3); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureReducers(ID_3, 1); + + plan(5, split1, split2, split3); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testNonIgfsSeveralBlocksPerNode() throws IgniteCheckedException { + GridHadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1, HOST_2); + GridHadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_1, HOST_2); + GridHadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_1, HOST_3); + + plan(1, split1); + assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) || + ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(2, split1); + assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) || + ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2); + assert ensureEmpty(ID_3); + + plan(1, split1, split2); + assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(2, split1, split2); + assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + + plan(3, split1, split2, split3); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureReducers(ID_3, 1); + + plan(5, split1, split2, split3); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testIgfsSeveralComplexBlocksPerNode() throws IgniteCheckedException { + GridHadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2, HOST_3); + GridHadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2, HOST_3); + + mapIgfsBlock(split1.file(), 0, 100, location(0, 50, ID_1, ID_2), location(51, 100, ID_1, ID_3)); + mapIgfsBlock(split2.file(), 0, 100, location(0, 50, ID_1, ID_2), location(51, 100, ID_2, ID_3)); + + plan(1, split1); + assert ensureMappers(ID_1, split1); + assert ensureReducers(ID_1, 1); + assert ensureEmpty(ID_2); + assert ensureEmpty(ID_3); + + plan(1, split2); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_2, 1); + assert ensureEmpty(ID_1); + assert ensureEmpty(ID_3); + + plan(1, split1, split2); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1) || ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0); + assert ensureEmpty(ID_3); + + plan(2, split1, split2); + assert ensureMappers(ID_1, split1); + assert ensureMappers(ID_2, split2); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureEmpty(ID_3); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testNonIgfsOrphans() throws IgniteCheckedException { + GridHadoopFileBlock split1 = split(false, "/file1", 0, 100, INVALID_HOST_1, INVALID_HOST_2); + GridHadoopFileBlock split2 = split(false, "/file2", 0, 100, INVALID_HOST_1, INVALID_HOST_3); + GridHadoopFileBlock split3 = split(false, "/file3", 0, 100, INVALID_HOST_2, INVALID_HOST_3); + + plan(1, split1); + assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) && ensureEmpty(ID_3) || + ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1) && ensureEmpty(ID_3) || + ensureEmpty(ID_1) && ensureEmpty(ID_2) && ensureMappers(ID_3, split1) && ensureReducers(ID_3, 1); + + plan(2, split1); + assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) && ensureEmpty(ID_3) || + ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2) && ensureEmpty(ID_3) || + ensureEmpty(ID_1) && ensureEmpty(ID_2) && ensureMappers(ID_3, split1) && ensureReducers(ID_3, 2); + + plan(1, split1, split2, split3); + assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) || + ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) || + ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) || + ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) && ensureReducers(ID_3, 0) || + ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 0) || + ensureReducers(ID_1, 0) && ensureReducers(ID_2, 0) && ensureReducers(ID_3, 1); + + plan(3, split1, split2, split3); + assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) || + ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) || + ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) || + ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1); + assert ensureReducers(ID_1, 1); + assert ensureReducers(ID_2, 1); + assert ensureReducers(ID_3, 1); + + plan(5, split1, split2, split3); + assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) || + ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) || + ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) || + ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) || + ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1); + assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || + ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); + } + + /** + * Create plan. + * + * @param reducers Reducers count. + * @param splits Splits. + * @return Plan. + * @throws IgniteCheckedException If failed. + */ + private static GridHadoopMapReducePlan plan(int reducers, GridHadoopInputSplit... splits) throws IgniteCheckedException { + assert reducers > 0; + assert splits != null && splits.length > 0; + + Collection<GridHadoopInputSplit> splitList = new ArrayList<>(splits.length); + + Collections.addAll(splitList, splits); + + Collection<ClusterNode> top = new ArrayList<>(); + + GridTestNode node1 = new GridTestNode(ID_1); + GridTestNode node2 = new GridTestNode(ID_2); + GridTestNode node3 = new GridTestNode(ID_3); + + node1.setHostName(HOST_1); + node2.setHostName(HOST_2); + node3.setHostName(HOST_3); + + top.add(node1); + top.add(node2); + top.add(node3); + + GridHadoopMapReducePlan plan = PLANNER.preparePlan(new MockJob(reducers, splitList), top, null); + + PLAN.set(plan); + + return plan; + } + + /** + * Ensure that node contains the given mappers. + * + * @param nodeId Node ID. + * @param expSplits Expected splits. + * @return {@code True} if this assumption is valid. + */ + private static boolean ensureMappers(UUID nodeId, GridHadoopInputSplit... expSplits) { + Collection<GridHadoopInputSplit> expSplitsCol = new ArrayList<>(); + + Collections.addAll(expSplitsCol, expSplits); + + Collection<GridHadoopInputSplit> splits = PLAN.get().mappers(nodeId); + + return F.eq(expSplitsCol, splits); + } + + /** + * Ensure that node contains the given amount of reducers. + * + * @param nodeId Node ID. + * @param reducers Reducers. + * @return {@code True} if this assumption is valid. + */ + private static boolean ensureReducers(UUID nodeId, int reducers) { + int[] reducersArr = PLAN.get().reducers(nodeId); + + return reducers == 0 ? F.isEmpty(reducersArr) : (reducersArr != null && reducersArr.length == reducers); + } + + /** + * Ensure that no mappers and reducers is located on this node. + * + * @param nodeId Node ID. + * @return {@code True} if this assumption is valid. + */ + private static boolean ensureEmpty(UUID nodeId) { + return F.isEmpty(PLAN.get().mappers(nodeId)) && F.isEmpty(PLAN.get().reducers(nodeId)); + } + + /** + * Create split. + * + * @param igfs IGFS flag. + * @param file File. + * @param start Start. + * @param len Length. + * @param hosts Hosts. + * @return Split. + */ + private static GridHadoopFileBlock split(boolean igfs, String file, long start, long len, String... hosts) { + URI uri = URI.create((igfs ? "igfs://igfs@" : "hdfs://") + file); + + return new GridHadoopFileBlock(hosts, uri, start, len); + } + + /** + * Create block location. + * + * @param start Start. + * @param len Length. + * @param nodeIds Node IDs. + * @return Block location. + */ + private static IgfsBlockLocation location(long start, long len, UUID... nodeIds) { + assert nodeIds != null && nodeIds.length > 0; + + Collection<ClusterNode> nodes = new ArrayList<>(nodeIds.length); + + for (UUID id : nodeIds) + nodes.add(new GridTestNode(id)); + + return new IgfsBlockLocationImpl(start, len, nodes); + } + + /** + * Map IGFS block to nodes. + * + * @param file File. + * @param start Start. + * @param len Length. + * @param locations Locations. + */ + private static void mapIgfsBlock(URI file, long start, long len, IgfsBlockLocation... locations) { + assert locations != null && locations.length > 0; + + IgfsPath path = new IgfsPath(file); + + Block block = new Block(path, start, len); + + Collection<IgfsBlockLocation> locationsList = new ArrayList<>(); + + Collections.addAll(locationsList, locations); + + BLOCK_MAP.put(block, locationsList); + } + + /** + * Block. + */ + private static class Block { + /** */ + private final IgfsPath path; + + /** */ + private final long start; + + /** */ + private final long len; + + /** + * Constructor. + * + * @param path Path. + * @param start Start. + * @param len Length. + */ + private Block(IgfsPath path, long start, long len) { + this.path = path; + this.start = start; + this.len = len; + } + + /** {@inheritDoc} */ + @SuppressWarnings("RedundantIfStatement") + @Override public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof Block)) return false; + + Block block = (Block) o; + + if (len != block.len) + return false; + + if (start != block.start) + return false; + + if (!path.equals(block.path)) + return false; + + return true; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = path.hashCode(); + + res = 31 * res + (int) (start ^ (start >>> 32)); + res = 31 * res + (int) (len ^ (len >>> 32)); + + return res; + } + } + + /** + * Mocked job. + */ + private static class MockJob implements GridHadoopJob { + /** Reducers count. */ + private final int reducers; + + /** */ + private Collection<GridHadoopInputSplit> splitList; + + /** + * Constructor. + * + * @param reducers Reducers count. + * @param splitList Splits. + */ + private MockJob(int reducers, Collection<GridHadoopInputSplit> splitList) { + this.reducers = reducers; + this.splitList = splitList; + } + + /** {@inheritDoc} */ + @Override public GridHadoopJobId id() { + return null; + } + + /** {@inheritDoc} */ + @Override public GridHadoopJobInfo info() { + return new HadoopDefaultJobInfo() { + @Override public int reducers() { + return reducers; + } + }; + } + + /** {@inheritDoc} */ + @Override public Collection<GridHadoopInputSplit> input() throws IgniteCheckedException { + return splitList; + } + + /** {@inheritDoc} */ + @Override public GridHadoopTaskContext getTaskContext(GridHadoopTaskInfo info) throws IgniteCheckedException { + return null; + } + + /** {@inheritDoc} */ + @Override public void initialize(boolean external, UUID nodeId) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void dispose(boolean external) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void prepareTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void cleanupTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void cleanupStagingDirectory() { + // No-op. + } + } + + /** + * Mocked IGFS. + */ + private static class MockIgfs implements IgfsEx { + /** {@inheritDoc} */ + @Override public boolean isProxy(URI path) { + return PROXY_MAP.containsKey(path) && PROXY_MAP.get(path); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) { + return BLOCK_MAP.get(new Block(path, start, len)); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len, + long maxLen) { + return null; + } + + /** {@inheritDoc} */ + @Override public void stop() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgfsContext context() { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsPaths proxyPaths() { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize, int seqReadsBeforePrefetch) { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsInputStreamAdapter open(IgfsPath path) { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsStatus globalSpace() throws IgniteCheckedException { + return null; + } + + /** {@inheritDoc} */ + @Override public void globalSampling(@Nullable Boolean val) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Nullable @Override public Boolean globalSampling() { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsLocalMetrics localMetrics() { + return null; + } + + /** {@inheritDoc} */ + @Override public long groupBlockSize() { + return 0; + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException { + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public String clientLogDirectory() { + return null; + } + + /** {@inheritDoc} */ + @Override public void clientLogDirectory(String logDir) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean evictExclude(IgfsPath path, boolean primary) { + return false; + } + + /** {@inheritDoc} */ + @Nullable @Override public String name() { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsConfiguration configuration() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean exists(IgfsPath path) { + return false; + } + + /** {@inheritDoc} */ + @Nullable @Override public IgfsFile info(IgfsPath path) { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsPathSummary summary(IgfsPath path) { + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) { + return null; + } + + /** {@inheritDoc} */ + @Override public void rename(IgfsPath src, IgfsPath dest) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean delete(IgfsPath path, boolean recursive) { + return false; + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsPath> listPaths(IgfsPath path) { + return null; + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsFile> listFiles(IgfsPath path) { + return null; + } + + /** {@inheritDoc} */ + @Override public long usedSpaceSize() { + return 0; + } + + /** {@inheritDoc} */ + @Override public Map<String, String> properties() { + return Collections.emptyMap(); + } + + /** {@inheritDoc} */ + @Override public IgfsOutputStream create(IgfsPath path, boolean overwrite) { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsOutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, + long blockSize, @Nullable Map<String, String> props) { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsOutputStream create(IgfsPath path, int bufSize, boolean overwrite, + @Nullable IgniteUuid affKey, int replication, long blockSize, @Nullable Map<String, String> props) { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsOutputStream append(IgfsPath path, boolean create) { + return null; + } + + /** {@inheritDoc} */ + @Override public IgfsOutputStream append(IgfsPath path, int bufSize, boolean create, + @Nullable Map<String, String> props) { + return null; + } + + /** {@inheritDoc} */ + @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgfsMetrics metrics() { + return null; + } + + /** {@inheritDoc} */ + @Override public void resetMetrics() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public long size(IgfsPath path) { + return 0; + } + + /** {@inheritDoc} */ + @Override public void format() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, + Collection<IgfsPath> paths, @Nullable T arg) { + return null; + } + + /** {@inheritDoc} */ + @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, + Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) { + return null; + } + + /** {@inheritDoc} */ + @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) { + return null; + } + + /** {@inheritDoc} */ + @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles, + long maxRangeLen, @Nullable T arg) { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid nextAffinityKey() { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteFs withAsync() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isAsync() { + return false; + } + + /** {@inheritDoc} */ + @Override public <R> IgniteFuture<R> future() { + return null; + } + } + + /** + * Mocked Grid. + */ + @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") + private static class MockIgnite extends IgniteSpringBean implements IgniteEx { + /** {@inheritDoc} */ + @Override public IgniteClusterEx cluster() { + return (IgniteClusterEx)super.cluster(); + } + + /** {@inheritDoc} */ + @Override public IgniteFs igfsx(String name) { + assert F.eq("igfs", name); + + return IGFS; + } + + /** {@inheritDoc} */ + @Override public GridHadoop hadoop() { + return null; + } + + /** {@inheritDoc} */ + @Override public String name() { + return null; + } + + /** {@inheritDoc} */ + @Override public <K extends GridCacheUtilityKey, V> GridCacheProjectionEx<K, V> utilityCache(Class<K> keyCls, + Class<V> valCls) { + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public <K, V> GridCache<K, V> cachex(@Nullable String name) { + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public <K, V> GridCache<K, V> cachex() { + return null; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public Collection<GridCache<?, ?>> cachesx(@Nullable IgnitePredicate<? super GridCache<?, ?>>... p) { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean eventUserRecordable(int type) { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean allEventsUserRecordable(int[] types) { + return false; + } + + /** {@inheritDoc} */ + @Override public Collection<String> compatibleVersions() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isJmxRemoteEnabled() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isRestartEnabled() { + return false; + } + + /** {@inheritDoc} */ + @Override public ClusterNode localNode() { + return null; + } + + /** {@inheritDoc} */ + @Override public String latestVersion() { + return null; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java index 88dfd2b..8b5d2b6 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java @@ -54,9 +54,9 @@ public class GridHadoopConcurrentHashMultimapSelftest extends GridHadoopAbstract GridHadoopTaskContext taskCtx = new TaskContext(); - GridHadoopConcurrentHashMultimap m = new GridHadoopConcurrentHashMultimap(job, mem, mapSize); + HadoopConcurrentHashMultimap m = new HadoopConcurrentHashMultimap(job, mem, mapSize); - GridHadoopConcurrentHashMultimap.Adder a = m.startAdding(taskCtx); + HadoopConcurrentHashMultimap.Adder a = m.startAdding(taskCtx); Multimap<Integer, Integer> mm = ArrayListMultimap.create(); Multimap<Integer, Integer> vis = ArrayListMultimap.create(); @@ -90,7 +90,7 @@ public class GridHadoopConcurrentHashMultimapSelftest extends GridHadoopAbstract assertEquals(0, mem.allocatedSize()); } - private void check(GridHadoopConcurrentHashMultimap m, Multimap<Integer, Integer> mm, + private void check(HadoopConcurrentHashMultimap m, Multimap<Integer, Integer> mm, final Multimap<Integer, Integer> vis, GridHadoopTaskContext taskCtx) throws Exception { final GridHadoopTaskInput in = m.input(taskCtx); @@ -129,7 +129,7 @@ public class GridHadoopConcurrentHashMultimapSelftest extends GridHadoopAbstract final GridDataInput dataInput = new GridUnsafeDataInput(); - m.visit(false, new GridHadoopConcurrentHashMultimap.Visitor() { + m.visit(false, new HadoopConcurrentHashMultimap.Visitor() { /** */ IntWritable key = new IntWritable(); @@ -184,7 +184,7 @@ public class GridHadoopConcurrentHashMultimapSelftest extends GridHadoopAbstract final GridHadoopTaskContext taskCtx = new TaskContext(); - final GridHadoopConcurrentHashMultimap m = new GridHadoopConcurrentHashMultimap(job, mem, 16); + final HadoopConcurrentHashMultimap m = new HadoopConcurrentHashMultimap(job, mem, 16); final ConcurrentMap<Integer, Collection<Integer>> mm = new ConcurrentHashMap<>(); @@ -199,7 +199,7 @@ public class GridHadoopConcurrentHashMultimapSelftest extends GridHadoopAbstract IntWritable key = new IntWritable(); IntWritable val = new IntWritable(); - GridHadoopMultimap.Adder a = m.startAdding(taskCtx); + HadoopMultimap.Adder a = m.startAdding(taskCtx); for (int i = 0; i < 50000; i++) { int k = rnd.nextInt(32000); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMapSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMapSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMapSelfTest.java index 92177ad..90d957b 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMapSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMapSelfTest.java @@ -89,9 +89,9 @@ public class GridHadoopHashMapSelfTest extends GridHadoopAbstractMapTest { GridHadoopTaskContext taskCtx = new TaskContext(); - final GridHadoopHashMultimap m = new GridHadoopHashMultimap(new JobInfo(), mem, mapSize); + final HadoopHashMultimap m = new HadoopHashMultimap(new JobInfo(), mem, mapSize); - GridHadoopMultimap.Adder a = m.startAdding(taskCtx); + HadoopMultimap.Adder a = m.startAdding(taskCtx); Multimap<Integer, Integer> mm = ArrayListMultimap.create(); @@ -124,7 +124,7 @@ public class GridHadoopHashMapSelfTest extends GridHadoopAbstractMapTest { assertEquals(0, mem.allocatedSize()); } - private void check(GridHadoopHashMultimap m, Multimap<Integer, Integer> mm, GridHadoopTaskContext taskCtx) throws Exception { + private void check(HadoopHashMultimap m, Multimap<Integer, Integer> mm, GridHadoopTaskContext taskCtx) throws Exception { final GridHadoopTaskInput in = m.input(taskCtx); Map<Integer, Collection<Integer>> mmm = mm.asMap(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipListSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipListSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipListSelfTest.java index 6ba00ad..3d930ff 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipListSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipListSelfTest.java @@ -47,7 +47,7 @@ public class GridHadoopSkipListSelfTest extends GridHadoopAbstractMapTest { int all = 10000; for (int i = 0; i < all; i++) { - int level = GridHadoopSkipList.randomLevel(rnd); + int level = HadoopSkipList.randomLevel(rnd); levelsCnts[level]++; } @@ -86,9 +86,9 @@ public class GridHadoopSkipListSelfTest extends GridHadoopAbstractMapTest { GridHadoopTaskContext taskCtx = new TaskContext(); - GridHadoopMultimap m = new GridHadoopSkipList(job, mem); + HadoopMultimap m = new HadoopSkipList(job, mem); - GridHadoopConcurrentHashMultimap.Adder a = m.startAdding(taskCtx); + HadoopConcurrentHashMultimap.Adder a = m.startAdding(taskCtx); Multimap<Integer, Integer> mm = ArrayListMultimap.create(); Multimap<Integer, Integer> vis = ArrayListMultimap.create(); @@ -122,7 +122,7 @@ public class GridHadoopSkipListSelfTest extends GridHadoopAbstractMapTest { assertEquals(0, mem.allocatedSize()); } - private void check(GridHadoopMultimap m, Multimap<Integer, Integer> mm, final Multimap<Integer, Integer> vis, GridHadoopTaskContext taskCtx) + private void check(HadoopMultimap m, Multimap<Integer, Integer> mm, final Multimap<Integer, Integer> vis, GridHadoopTaskContext taskCtx) throws Exception { final GridHadoopTaskInput in = m.input(taskCtx); @@ -165,7 +165,7 @@ public class GridHadoopSkipListSelfTest extends GridHadoopAbstractMapTest { final GridDataInput dataInput = new GridUnsafeDataInput(); - m.visit(false, new GridHadoopConcurrentHashMultimap.Visitor() { + m.visit(false, new HadoopConcurrentHashMultimap.Visitor() { /** */ IntWritable key = new IntWritable(); @@ -220,7 +220,7 @@ public class GridHadoopSkipListSelfTest extends GridHadoopAbstractMapTest { final GridHadoopTaskContext taskCtx = new TaskContext(); - final GridHadoopMultimap m = new GridHadoopSkipList(job, mem); + final HadoopMultimap m = new HadoopSkipList(job, mem); final ConcurrentMap<Integer, Collection<Integer>> mm = new ConcurrentHashMap<>(); @@ -235,7 +235,7 @@ public class GridHadoopSkipListSelfTest extends GridHadoopAbstractMapTest { IntWritable key = new IntWritable(); IntWritable val = new IntWritable(); - GridHadoopMultimap.Adder a = m.startAdding(taskCtx); + HadoopMultimap.Adder a = m.startAdding(taskCtx); for (int i = 0; i < 50000; i++) { int k = rnd.nextInt(32000); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataStreamSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataStreamSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataStreamSelfTest.java index 39a537b..98475fb 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataStreamSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataStreamSelfTest.java @@ -31,7 +31,7 @@ public class GridHadoopDataStreamSelfTest extends GridCommonAbstractTest { public void testStreams() throws IOException { GridUnsafeMemory mem = new GridUnsafeMemory(0); - GridHadoopDataOutStream out = new GridHadoopDataOutStream(mem); + HadoopDataOutStream out = new HadoopDataOutStream(mem); int size = 4 * 1024; @@ -86,7 +86,7 @@ public class GridHadoopDataStreamSelfTest extends GridCommonAbstractTest { out.write(new byte[]{0,1,2,3}, 1, 2); out.writeUTF("mom washes rum"); - GridHadoopDataInStream in = new GridHadoopDataInStream(mem); + HadoopDataInStream in = new HadoopDataInStream(mem); in.buffer().set(ptr, out.buffer().pointer()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopExecutorServiceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopExecutorServiceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopExecutorServiceTest.java deleted file mode 100644 index c97b6ab..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopExecutorServiceTest.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor; - -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jdk8.backport.*; - -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * - */ -public class GridHadoopExecutorServiceTest extends GridCommonAbstractTest { - /** - * @throws Exception If failed. - */ - public void testExecutesAll() throws Exception { - final GridHadoopExecutorService exec = new GridHadoopExecutorService(log, "_GRID_NAME_", 10, 5); - - for (int i = 0; i < 5; i++) { - final int loops = 5000; - int threads = 17; - - final LongAdder sum = new LongAdder(); - - multithreaded(new Callable<Object>() { - @Override public Object call() throws Exception { - for (int i = 0; i < loops; i++) { - exec.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - sum.increment(); - - return null; - } - }); - } - - return null; - } - }, threads); - - while (exec.active() != 0) { - X.println("__ active: " + exec.active()); - - Thread.sleep(200); - } - - assertEquals(threads * loops, sum.sum()); - - X.println("_ ok"); - } - - assertTrue(exec.shutdown(0)); - } - - /** - * @throws Exception If failed. - */ - public void testShutdown() throws Exception { - for (int i = 0; i < 5; i++) { - final GridHadoopExecutorService exec = new GridHadoopExecutorService(log, "_GRID_NAME_", 10, 5); - - final LongAdder sum = new LongAdder(); - - final AtomicBoolean finish = new AtomicBoolean(); - - IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - while (!finish.get()) { - exec.submit(new Callable<Void>() { - @Override public Void call() throws Exception { - sum.increment(); - - return null; - } - }); - } - - return null; - } - }, 19); - - Thread.sleep(200); - - assertTrue(exec.shutdown(50)); - - long res = sum.sum(); - - assertTrue(res > 0); - - finish.set(true); - - fut.get(); - - assertEquals(res, sum.sum()); // Nothing was executed after shutdown. - - X.println("_ ok"); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorServiceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorServiceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorServiceTest.java new file mode 100644 index 0000000..aa50fa9 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorServiceTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor; + +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jdk8.backport.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +public class HadoopExecutorServiceTest extends GridCommonAbstractTest { + /** + * @throws Exception If failed. + */ + public void testExecutesAll() throws Exception { + final HadoopExecutorService exec = new HadoopExecutorService(log, "_GRID_NAME_", 10, 5); + + for (int i = 0; i < 5; i++) { + final int loops = 5000; + int threads = 17; + + final LongAdder sum = new LongAdder(); + + multithreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 0; i < loops; i++) { + exec.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + sum.increment(); + + return null; + } + }); + } + + return null; + } + }, threads); + + while (exec.active() != 0) { + X.println("__ active: " + exec.active()); + + Thread.sleep(200); + } + + assertEquals(threads * loops, sum.sum()); + + X.println("_ ok"); + } + + assertTrue(exec.shutdown(0)); + } + + /** + * @throws Exception If failed. + */ + public void testShutdown() throws Exception { + for (int i = 0; i < 5; i++) { + final HadoopExecutorService exec = new HadoopExecutorService(log, "_GRID_NAME_", 10, 5); + + final LongAdder sum = new LongAdder(); + + final AtomicBoolean finish = new AtomicBoolean(); + + IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + while (!finish.get()) { + exec.submit(new Callable<Void>() { + @Override public Void call() throws Exception { + sum.increment(); + + return null; + } + }); + } + + return null; + } + }, 19); + + Thread.sleep(200); + + assertTrue(exec.shutdown(50)); + + long res = sum.sum(); + + assertTrue(res > 0); + + finish.set(true); + + fut.get(); + + assertEquals(res, sum.sum()); // Nothing was executed after shutdown. + + X.println("_ ok"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunicationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunicationSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunicationSelfTest.java index a725ddc..dd3c5d4 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunicationSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunicationSelfTest.java @@ -125,7 +125,7 @@ public class GridHadoopExternalCommunicationSelfTest extends GridCommonAbstractT } /** {@inheritDoc} */ - @Override public void onMessageReceived(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) { + @Override public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage msg) { assert msg instanceof TestMessage; msgs.add((TestMessage)msg); @@ -134,7 +134,7 @@ public class GridHadoopExternalCommunicationSelfTest extends GridCommonAbstractT } /** {@inheritDoc} */ - @Override public void onConnectionLost(GridHadoopProcessDescriptor desc) { + @Override public void onConnectionLost(HadoopProcessDescriptor desc) { // No-op. } @@ -157,7 +157,7 @@ public class GridHadoopExternalCommunicationSelfTest extends GridCommonAbstractT /** * */ - private static class TestMessage implements GridHadoopMessage { + private static class TestMessage implements HadoopMessage { /** From index. */ private int from; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index f599ed9..822ab8f 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -83,7 +83,7 @@ public class IgniteHadoopTestSuite extends TestSuite { suite.addTest(new TestSuite(ldr.loadClass(GridHadoopValidationSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopDefaultMapReducePlannerSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopDefaultMapReducePlannerSelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(GridHadoopJobTrackerSelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(GridHadoopHashMapSelfTest.class.getName())));