http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobTrackerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobTrackerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobTrackerSelfTest.java deleted file mode 100644 index 7dffbc3..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobTrackerSelfTest.java +++ /dev/null @@ -1,330 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.*; -import org.apache.hadoop.mapreduce.lib.output.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; - -/** - * Job tracker self test. - */ -public class GridHadoopJobTrackerSelfTest extends GridHadoopAbstractSelfTest { - /** */ - private static final String PATH_OUTPUT = "/test-out"; - - /** Test block count parameter name. */ - private static final int BLOCK_CNT = 10; - - /** */ - private static GridHadoopSharedMap m = GridHadoopSharedMap.map(GridHadoopJobTrackerSelfTest.class); - - /** Map task execution count. */ - private static final AtomicInteger mapExecCnt = m.put("mapExecCnt", new AtomicInteger()); - - /** Reduce task execution count. */ - private static final AtomicInteger reduceExecCnt = m.put("reduceExecCnt", new AtomicInteger()); - - /** Reduce task execution count. */ - private static final AtomicInteger combineExecCnt = m.put("combineExecCnt", new AtomicInteger()); - - /** */ - private static final Map<String, CountDownLatch> latch = m.put("latch", new HashMap<String, CountDownLatch>()); - - /** {@inheritDoc} */ - @Override protected boolean igfsEnabled() { - return true; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - startGrids(gridCount()); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - - super.afterTestsStopped(); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - latch.put("mapAwaitLatch", new CountDownLatch(1)); - latch.put("reduceAwaitLatch", new CountDownLatch(1)); - latch.put("combineAwaitLatch", new CountDownLatch(1)); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - mapExecCnt.set(0); - combineExecCnt.set(0); - reduceExecCnt.set(0); - } - - /** {@inheritDoc} */ - @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); - - cfg.setMapReducePlanner(new GridHadoopTestRoundRobinMrPlanner()); - cfg.setExternalExecution(false); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testSimpleTaskSubmit() throws Exception { - try { - UUID globalId = UUID.randomUUID(); - - Job job = Job.getInstance(); - setupFileSystems(job.getConfiguration()); - - job.setMapperClass(TestMapper.class); - job.setReducerClass(TestReducer.class); - job.setInputFormatClass(InFormat.class); - - FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "1")); - - GridHadoopJobId jobId = new GridHadoopJobId(globalId, 1); - - grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); - - checkStatus(jobId, false); - - info("Releasing map latch."); - - latch.get("mapAwaitLatch").countDown(); - - checkStatus(jobId, false); - - info("Releasing reduce latch."); - - latch.get("reduceAwaitLatch").countDown(); - - checkStatus(jobId, true); - - assertEquals(10, mapExecCnt.get()); - assertEquals(0, combineExecCnt.get()); - assertEquals(1, reduceExecCnt.get()); - } - finally { - // Safety. - latch.get("mapAwaitLatch").countDown(); - latch.get("combineAwaitLatch").countDown(); - latch.get("reduceAwaitLatch").countDown(); - } - } - - /** - * @throws Exception If failed. - */ - public void testTaskWithCombinerPerMap() throws Exception { - try { - UUID globalId = UUID.randomUUID(); - - Job job = Job.getInstance(); - setupFileSystems(job.getConfiguration()); - - job.setMapperClass(TestMapper.class); - job.setReducerClass(TestReducer.class); - job.setCombinerClass(TestCombiner.class); - job.setInputFormatClass(InFormat.class); - - FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "2")); - - GridHadoopJobId jobId = new GridHadoopJobId(globalId, 1); - - grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); - - checkStatus(jobId, false); - - info("Releasing map latch."); - - latch.get("mapAwaitLatch").countDown(); - - checkStatus(jobId, false); - - // All maps are completed. We have a combiner, so no reducers should be executed - // before combiner latch is released. - - U.sleep(50); - - assertEquals(0, reduceExecCnt.get()); - - info("Releasing combiner latch."); - - latch.get("combineAwaitLatch").countDown(); - - checkStatus(jobId, false); - - info("Releasing reduce latch."); - - latch.get("reduceAwaitLatch").countDown(); - - checkStatus(jobId, true); - - assertEquals(10, mapExecCnt.get()); - assertEquals(10, combineExecCnt.get()); - assertEquals(1, reduceExecCnt.get()); - } - finally { - // Safety. - latch.get("mapAwaitLatch").countDown(); - latch.get("combineAwaitLatch").countDown(); - latch.get("reduceAwaitLatch").countDown(); - } - } - - /** - * Checks job execution status. - * - * @param jobId Job ID. - * @param complete Completion status. - * @throws Exception If failed. - */ - private void checkStatus(GridHadoopJobId jobId, boolean complete) throws Exception { - for (int i = 0; i < gridCount(); i++) { - IgniteKernal kernal = (IgniteKernal)grid(i); - - GridHadoop hadoop = kernal.hadoop(); - - GridHadoopJobStatus stat = hadoop.status(jobId); - - assert stat != null; - - IgniteInternalFuture<?> fut = hadoop.finishFuture(jobId); - - if (!complete) - assertFalse(fut.isDone()); - else { - info("Waiting for status future completion on node [idx=" + i + ", nodeId=" + - kernal.getLocalNodeId() + ']'); - - fut.get(); - } - } - } - - /** - * Test input format - */ - public static class InFormat extends InputFormat { - - @Override public List<InputSplit> getSplits(JobContext ctx) throws IOException, InterruptedException { - List<InputSplit> res = new ArrayList<>(BLOCK_CNT); - - for (int i = 0; i < BLOCK_CNT; i++) - try { - res.add(new FileSplit(new Path(new URI("someFile")), i, i + 1, new String[] {"localhost"})); - } - catch (URISyntaxException e) { - throw new IOException(e); - } - - return res; - } - - @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext ctx) throws IOException, InterruptedException { - return new RecordReader() { - @Override public void initialize(InputSplit split, TaskAttemptContext ctx) { - } - - @Override public boolean nextKeyValue() { - return false; - } - - @Override public Object getCurrentKey() { - return null; - } - - @Override public Object getCurrentValue() { - return null; - } - - @Override public float getProgress() { - return 0; - } - - @Override public void close() { - - } - }; - } - } - - /** - * Test mapper. - */ - private static class TestMapper extends Mapper { - @Override public void run(Context ctx) throws IOException, InterruptedException { - System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId()); - - latch.get("mapAwaitLatch").await(); - - mapExecCnt.incrementAndGet(); - - System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId()); - } - } - - /** - * Test reducer. - */ - private static class TestReducer extends Reducer { - @Override public void run(Context ctx) throws IOException, InterruptedException { - System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId()); - - latch.get("reduceAwaitLatch").await(); - - reduceExecCnt.incrementAndGet(); - - System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId()); - } - } - - /** - * Test combiner. - */ - private static class TestCombiner extends Reducer { - @Override public void run(Context ctx) throws IOException, InterruptedException { - System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId()); - - latch.get("combineAwaitLatch").await(); - - combineExecCnt.incrementAndGet(); - - System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId()); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceEmbeddedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceEmbeddedSelfTest.java deleted file mode 100644 index 89318f3..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceEmbeddedSelfTest.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.io.serializer.*; -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.hadoop.examples.*; - -import java.util.*; - -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; - -/** - * Tests map-reduce execution with embedded mode. - */ -public class GridHadoopMapReduceEmbeddedSelfTest extends GridHadoopMapReduceTest { - /** */ - private static Map<String, Boolean> flags = GridHadoopSharedMap.map(GridHadoopMapReduceEmbeddedSelfTest.class) - .put("flags", new HashMap<String, Boolean>()); - - /** {@inheritDoc} */ - @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); - - cfg.setExternalExecution(false); - - return cfg; - } - - /** - * Tests whole job execution with all phases in old and new versions of API with definition of custom - * Serialization, Partitioner and IO formats. - * @throws Exception If fails. - */ - public void testMultiReducerWholeMapReduceExecution() throws Exception { - IgfsPath inDir = new IgfsPath(PATH_INPUT); - - igfs.mkdirs(inDir); - - IgfsPath inFile = new IgfsPath(inDir, GridHadoopWordCount2.class.getSimpleName() + "-input"); - - generateTestFile(inFile.toString(), "key1", 10000, "key2", 20000, "key3", 15000, "key4", 7000, "key5", 12000, - "key6", 18000 ); - - for (int i = 0; i < 2; i++) { - boolean useNewAPI = i == 1; - - igfs.delete(new IgfsPath(PATH_OUTPUT), true); - - flags.put("serializationWasConfigured", false); - flags.put("partitionerWasConfigured", false); - flags.put("inputFormatWasConfigured", false); - flags.put("outputFormatWasConfigured", false); - - JobConf jobConf = new JobConf(); - - jobConf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName()); - - //To split into about 6-7 items for v2 - jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000); - - //For v1 - jobConf.setInt("fs.local.block.size", 65000); - - // File system coordinates. - setupFileSystems(jobConf); - - GridHadoopWordCount1.setTasksClasses(jobConf, !useNewAPI, !useNewAPI, !useNewAPI); - - if (!useNewAPI) { - jobConf.setPartitionerClass(CustomV1Partitioner.class); - jobConf.setInputFormat(CustomV1InputFormat.class); - jobConf.setOutputFormat(CustomV1OutputFormat.class); - } - - Job job = Job.getInstance(jobConf); - - GridHadoopWordCount2.setTasksClasses(job, useNewAPI, useNewAPI, useNewAPI); - - if (useNewAPI) { - job.setPartitionerClass(CustomV2Partitioner.class); - job.setInputFormatClass(CustomV2InputFormat.class); - job.setOutputFormatClass(CustomV2OutputFormat.class); - } - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString())); - FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT)); - - job.setNumReduceTasks(3); - - job.setJarByClass(GridHadoopWordCount2.class); - - IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 1), - createJobInfo(job.getConfiguration())); - - fut.get(); - - assertTrue("Serialization was configured (new API is " + useNewAPI + ")", - flags.get("serializationWasConfigured")); - - assertTrue("Partitioner was configured (new API is = " + useNewAPI + ")", - flags.get("partitionerWasConfigured")); - - assertTrue("Input format was configured (new API is = " + useNewAPI + ")", - flags.get("inputFormatWasConfigured")); - - assertTrue("Output format was configured (new API is = " + useNewAPI + ")", - flags.get("outputFormatWasConfigured")); - - assertEquals("Use new API = " + useNewAPI, - "key3\t15000\n" + - "key6\t18000\n", - readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00000") - ); - - assertEquals("Use new API = " + useNewAPI, - "key1\t10000\n" + - "key4\t7000\n", - readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00001") - ); - - assertEquals("Use new API = " + useNewAPI, - "key2\t20000\n" + - "key5\t12000\n", - readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00002") - ); - - } - } - - /** - * Custom serialization class that inherits behaviour of native {@link WritableSerialization}. - */ - protected static class CustomSerialization extends WritableSerialization { - @Override public void setConf(Configuration conf) { - super.setConf(conf); - - flags.put("serializationWasConfigured", true); - } - } - - /** - * Custom implementation of Partitioner in v1 API. - */ - private static class CustomV1Partitioner extends org.apache.hadoop.mapred.lib.HashPartitioner { - /** {@inheritDoc} */ - @Override public void configure(JobConf job) { - flags.put("partitionerWasConfigured", true); - } - } - - /** - * Custom implementation of Partitioner in v2 API. - */ - private static class CustomV2Partitioner extends org.apache.hadoop.mapreduce.lib.partition.HashPartitioner - implements Configurable { - /** {@inheritDoc} */ - @Override public void setConf(Configuration conf) { - flags.put("partitionerWasConfigured", true); - } - - /** {@inheritDoc} */ - @Override public Configuration getConf() { - return null; - } - } - - /** - * Custom implementation of InputFormat in v2 API. - */ - private static class CustomV2InputFormat extends org.apache.hadoop.mapreduce.lib.input.TextInputFormat implements Configurable { - /** {@inheritDoc} */ - @Override public void setConf(Configuration conf) { - flags.put("inputFormatWasConfigured", true); - } - - /** {@inheritDoc} */ - @Override public Configuration getConf() { - return null; - } - } - - /** - * Custom implementation of OutputFormat in v2 API. - */ - private static class CustomV2OutputFormat extends org.apache.hadoop.mapreduce.lib.output.TextOutputFormat implements Configurable { - /** {@inheritDoc} */ - @Override public void setConf(Configuration conf) { - flags.put("outputFormatWasConfigured", true); - } - - /** {@inheritDoc} */ - @Override public Configuration getConf() { - return null; - } - } - - /** - * Custom implementation of InputFormat in v1 API. - */ - private static class CustomV1InputFormat extends org.apache.hadoop.mapred.TextInputFormat { - /** {@inheritDoc} */ - @Override public void configure(JobConf job) { - super.configure(job); - - flags.put("inputFormatWasConfigured", true); - } - } - - /** - * Custom implementation of OutputFormat in v1 API. - */ - private static class CustomV1OutputFormat extends org.apache.hadoop.mapred.TextOutputFormat implements JobConfigurable { - /** {@inheritDoc} */ - @Override public void configure(JobConf job) { - flags.put("outputFormatWasConfigured", true); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceTest.java deleted file mode 100644 index 6bddf9b..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceTest.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.ignite.*; -import org.apache.ignite.hadoop.fs.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.hadoop.counter.*; -import org.apache.ignite.internal.processors.hadoop.examples.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.testframework.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; - -/** - * Test of whole cycle of map-reduce processing via Job tracker. - */ -public class GridHadoopMapReduceTest extends GridHadoopAbstractWordCountTest { - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 3; - } - - /** - * Tests whole job execution with all phases in all combination of new and old versions of API. - * @throws Exception If fails. - */ - public void testWholeMapReduceExecution() throws Exception { - IgfsPath inDir = new IgfsPath(PATH_INPUT); - - igfs.mkdirs(inDir); - - IgfsPath inFile = new IgfsPath(inDir, GridHadoopWordCount2.class.getSimpleName() + "-input"); - - generateTestFile(inFile.toString(), "red", 100000, "blue", 200000, "green", 150000, "yellow", 70000 ); - - for (int i = 0; i < 8; i++) { - igfs.delete(new IgfsPath(PATH_OUTPUT), true); - - boolean useNewMapper = (i & 1) == 0; - boolean useNewCombiner = (i & 2) == 0; - boolean useNewReducer = (i & 4) == 0; - - JobConf jobConf = new JobConf(); - - jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName()); - jobConf.setUser("yyy"); - jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz"); - - //To split into about 40 items for v2 - jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000); - - //For v1 - jobConf.setInt("fs.local.block.size", 65000); - - // File system coordinates. - setupFileSystems(jobConf); - - GridHadoopWordCount1.setTasksClasses(jobConf, !useNewMapper, !useNewCombiner, !useNewReducer); - - Job job = Job.getInstance(jobConf); - - GridHadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString())); - FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT)); - - job.setJarByClass(GridHadoopWordCount2.class); - - GridHadoopJobId jobId = new GridHadoopJobId(UUID.randomUUID(), 1); - - IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); - - fut.get(); - - checkJobStatistics(jobId); - - assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " + - useNewReducer, - "blue\t200000\n" + - "green\t150000\n" + - "red\t100000\n" + - "yellow\t70000\n", - readAndSortFile(PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000") - ); - } - } - - /** - * Simple test job statistics. - * - * @param jobId Job id. - * @throws IgniteCheckedException - */ - private void checkJobStatistics(GridHadoopJobId jobId) throws IgniteCheckedException, IOException { - GridHadoopCounters cntrs = grid(0).hadoop().counters(jobId); - - HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null); - - Map<String, SortedMap<Integer,Long>> tasks = new TreeMap<>(); - - Map<String, Integer> phaseOrders = new HashMap<>(); - phaseOrders.put("submit", 0); - phaseOrders.put("prepare", 1); - phaseOrders.put("start", 2); - phaseOrders.put("Cstart", 3); - phaseOrders.put("finish", 4); - - String prevTaskId = null; - - long apiEvtCnt = 0; - - for (T2<String, Long> evt : perfCntr.evts()) { - //We expect string pattern: COMBINE 1 run 7fa86a14-5a08-40e3-a7cb-98109b52a706 - String[] parsedEvt = evt.get1().split(" "); - - String taskId; - String taskPhase; - - if ("JOB".equals(parsedEvt[0])) { - taskId = parsedEvt[0]; - taskPhase = parsedEvt[1]; - } - else { - taskId = ("COMBINE".equals(parsedEvt[0]) ? "MAP" : parsedEvt[0].substring(0, 3)) + parsedEvt[1]; - taskPhase = ("COMBINE".equals(parsedEvt[0]) ? "C" : "") + parsedEvt[2]; - } - - if (!taskId.equals(prevTaskId)) - tasks.put(taskId, new TreeMap<Integer,Long>()); - - Integer pos = phaseOrders.get(taskPhase); - - assertNotNull("Invalid phase " + taskPhase, pos); - - tasks.get(taskId).put(pos, evt.get2()); - - prevTaskId = taskId; - - apiEvtCnt++; - } - - for (Map.Entry<String ,SortedMap<Integer,Long>> task : tasks.entrySet()) { - Map<Integer, Long> order = task.getValue(); - - long prev = 0; - - for (Map.Entry<Integer, Long> phase : order.entrySet()) { - assertTrue("Phase order of " + task.getKey() + " is invalid", phase.getValue() >= prev); - - prev = phase.getValue(); - } - } - - final IgfsPath statPath = new IgfsPath("/xxx/yyy/zzz/" + jobId + "/performance"); - - GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return igfs.exists(statPath); - } - }, 10000); - - BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath))); - - assertEquals(apiEvtCnt, GridHadoopTestUtils.simpleCheckJobStatFile(reader)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingExternalTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingExternalTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingExternalTest.java deleted file mode 100644 index 23884ef..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingExternalTest.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -/** - * External test for sorting. - */ -public class GridHadoopSortingExternalTest extends GridHadoopSortingTest { - /** {@inheritDoc} */ - @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); - - cfg.setExternalExecution(true); - - return cfg; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingTest.java deleted file mode 100644 index 9f107d1..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingTest.java +++ /dev/null @@ -1,281 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.io.serializer.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.*; -import org.apache.hadoop.mapreduce.lib.output.*; -import org.apache.ignite.internal.util.typedef.*; - -import java.io.*; -import java.net.*; -import java.util.*; - -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; - -/** - * Tests correct sorting. - */ -public class GridHadoopSortingTest extends GridHadoopAbstractSelfTest { - /** */ - private static final String PATH_INPUT = "/test-in"; - - /** */ - private static final String PATH_OUTPUT = "/test-out"; - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 3; - } - - /** - * @return {@code True} if IGFS is enabled on Hadoop nodes. - */ - @Override protected boolean igfsEnabled() { - return true; - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - startGrids(gridCount()); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(true); - } - - /** {@inheritDoc} */ - @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); - - cfg.setExternalExecution(false); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testSortSimple() throws Exception { - // Generate test data. - Job job = Job.getInstance(); - - job.setInputFormatClass(InFormat.class); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(NullWritable.class); - - job.setMapperClass(Mapper.class); - job.setNumReduceTasks(0); - - setupFileSystems(job.getConfiguration()); - - FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_INPUT)); - - X.printerrln("Data generation started."); - - grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 1), - createJobInfo(job.getConfiguration())).get(180000); - - X.printerrln("Data generation complete."); - - // Run main map-reduce job. - job = Job.getInstance(); - - setupFileSystems(job.getConfiguration()); - - job.getConfiguration().set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, JavaSerialization.class.getName() + - "," + WritableSerialization.class.getName()); - - FileInputFormat.setInputPaths(job, new Path(igfsScheme() + PATH_INPUT)); - FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT)); - - job.setSortComparatorClass(JavaSerializationComparator.class); - - job.setMapperClass(MyMapper.class); - job.setReducerClass(MyReducer.class); - - job.setNumReduceTasks(2); - - job.setMapOutputKeyClass(UUID.class); - job.setMapOutputValueClass(NullWritable.class); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(NullWritable.class); - - X.printerrln("Job started."); - - grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 2), - createJobInfo(job.getConfiguration())).get(180000); - - X.printerrln("Job complete."); - - // Check result. - Path outDir = new Path(igfsScheme() + PATH_OUTPUT); - - AbstractFileSystem fs = AbstractFileSystem.get(new URI(igfsScheme()), job.getConfiguration()); - - for (FileStatus file : fs.listStatus(outDir)) { - X.printerrln("__ file: " + file); - - if (file.getLen() == 0) - continue; - - FSDataInputStream in = fs.open(file.getPath()); - - Scanner sc = new Scanner(in); - - UUID prev = null; - - while(sc.hasNextLine()) { - UUID next = UUID.fromString(sc.nextLine()); - -// X.printerrln("___ check: " + next); - - if (prev != null) - assertTrue(prev.compareTo(next) < 0); - - prev = next; - } - } - } - - public static class InFormat extends InputFormat<Text, NullWritable> { - /** {@inheritDoc} */ - @Override public List<InputSplit> getSplits(JobContext ctx) throws IOException, InterruptedException { - List<InputSplit> res = new ArrayList<>(); - - FakeSplit split = new FakeSplit(20); - - for (int i = 0; i < 10; i++) - res.add(split); - - return res; - } - - /** {@inheritDoc} */ - @Override public RecordReader<Text, NullWritable> createRecordReader(final InputSplit split, - TaskAttemptContext ctx) throws IOException, InterruptedException { - return new RecordReader<Text, NullWritable>() { - /** */ - int cnt; - - /** */ - Text txt = new Text(); - - @Override public void initialize(InputSplit split, TaskAttemptContext ctx) { - // No-op. - } - - @Override public boolean nextKeyValue() throws IOException, InterruptedException { - return ++cnt <= split.getLength(); - } - - @Override public Text getCurrentKey() { - txt.set(UUID.randomUUID().toString()); - -// X.printerrln("___ read: " + txt); - - return txt; - } - - @Override public NullWritable getCurrentValue() { - return NullWritable.get(); - } - - @Override public float getProgress() throws IOException, InterruptedException { - return (float)cnt / split.getLength(); - } - - @Override public void close() { - // No-op. - } - }; - } - } - - public static class MyMapper extends Mapper<LongWritable, Text, UUID, NullWritable> { - /** {@inheritDoc} */ - @Override protected void map(LongWritable key, Text val, Context ctx) throws IOException, InterruptedException { -// X.printerrln("___ map: " + val); - - ctx.write(UUID.fromString(val.toString()), NullWritable.get()); - } - } - - public static class MyReducer extends Reducer<UUID, NullWritable, Text, NullWritable> { - /** */ - private Text text = new Text(); - - /** {@inheritDoc} */ - @Override protected void reduce(UUID key, Iterable<NullWritable> vals, Context ctx) - throws IOException, InterruptedException { -// X.printerrln("___ rdc: " + key); - - text.set(key.toString()); - - ctx.write(text, NullWritable.get()); - } - } - - public static class FakeSplit extends InputSplit implements Writable { - /** */ - private static final String[] HOSTS = {"127.0.0.1"}; - - /** */ - private int len; - - /** - * @param len Length. - */ - public FakeSplit(int len) { - this.len = len; - } - - /** - * - */ - public FakeSplit() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public long getLength() throws IOException, InterruptedException { - return len; - } - - /** {@inheritDoc} */ - @Override public String[] getLocations() throws IOException, InterruptedException { - return HOSTS; - } - - /** {@inheritDoc} */ - @Override public void write(DataOutput out) throws IOException { - out.writeInt(len); - } - - /** {@inheritDoc} */ - @Override public void readFields(DataInput in) throws IOException { - len = in.readInt(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskExecutionSelfTest.java deleted file mode 100644 index 541ed86..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskExecutionSelfTest.java +++ /dev/null @@ -1,551 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.*; -import org.apache.hadoop.mapreduce.lib.output.*; -import org.apache.ignite.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.hadoop.fs.v1.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.testframework.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; - -/** - * Tests map-reduce task execution basics. - */ -public class GridHadoopTaskExecutionSelfTest extends GridHadoopAbstractSelfTest { - /** */ - private static GridHadoopSharedMap m = GridHadoopSharedMap.map(GridHadoopTaskExecutionSelfTest.class); - - /** Line count. */ - private static final AtomicInteger totalLineCnt = m.put("totalLineCnt", new AtomicInteger()); - - /** Executed tasks. */ - private static final AtomicInteger executedTasks = m.put("executedTasks", new AtomicInteger()); - - /** Cancelled tasks. */ - private static final AtomicInteger cancelledTasks = m.put("cancelledTasks", new AtomicInteger()); - - /** Working directory of each task. */ - private static final Map<String, String> taskWorkDirs = m.put("taskWorkDirs", - new ConcurrentHashMap<String, String>()); - - /** Mapper id to fail. */ - private static final AtomicInteger failMapperId = m.put("failMapperId", new AtomicInteger()); - - /** Number of splits of the current input. */ - private static final AtomicInteger splitsCount = m.put("splitsCount", new AtomicInteger()); - - /** Test param. */ - private static final String MAP_WRITE = "test.map.write"; - - - /** {@inheritDoc} */ - @Override public IgfsConfiguration igfsConfiguration() { - IgfsConfiguration cfg = super.igfsConfiguration(); - - cfg.setFragmentizerEnabled(false); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected boolean igfsEnabled() { - return true; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - startGrids(gridCount()); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - - super.afterTestsStopped(); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - grid(0).fileSystem(igfsName).format(); - } - - /** {@inheritDoc} */ - @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); - - cfg.setMaxParallelTasks(5); - cfg.setExternalExecution(false); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testMapRun() throws Exception { - int lineCnt = 10000; - String fileName = "/testFile"; - - prepareFile(fileName, lineCnt); - - totalLineCnt.set(0); - taskWorkDirs.clear(); - - Configuration cfg = new Configuration(); - - cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName()); - - Job job = Job.getInstance(cfg); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - job.setMapperClass(TestMapper.class); - - job.setNumReduceTasks(0); - - job.setInputFormatClass(TextInputFormat.class); - - FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/")); - FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/")); - - job.setJarByClass(getClass()); - - IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 1), - createJobInfo(job.getConfiguration())); - - fut.get(); - - assertEquals(lineCnt, totalLineCnt.get()); - - assertEquals(32, taskWorkDirs.size()); - } - - /** - * @throws Exception If failed. - */ - public void testMapCombineRun() throws Exception { - int lineCnt = 10001; - String fileName = "/testFile"; - - prepareFile(fileName, lineCnt); - - totalLineCnt.set(0); - taskWorkDirs.clear(); - - Configuration cfg = new Configuration(); - - cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName()); - cfg.setBoolean(MAP_WRITE, true); - - Job job = Job.getInstance(cfg); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - job.setMapperClass(TestMapper.class); - job.setCombinerClass(TestCombiner.class); - job.setReducerClass(TestReducer.class); - - job.setNumReduceTasks(2); - - job.setInputFormatClass(TextInputFormat.class); - - FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/")); - FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output")); - - job.setJarByClass(getClass()); - - GridHadoopJobId jobId = new GridHadoopJobId(UUID.randomUUID(), 2); - - IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); - - fut.get(); - - assertEquals(lineCnt, totalLineCnt.get()); - - assertEquals(34, taskWorkDirs.size()); - - for (int g = 0; g < gridCount(); g++) - grid(g).hadoop().finishFuture(jobId).get(); - } - - /** - * @throws Exception If failed. - */ - public void testMapperException() throws Exception { - prepareFile("/testFile", 1000); - - Configuration cfg = new Configuration(); - - cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName()); - - Job job = Job.getInstance(cfg); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - job.setMapperClass(FailMapper.class); - - job.setNumReduceTasks(0); - - job.setInputFormatClass(TextInputFormat.class); - - FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/")); - FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/")); - - job.setJarByClass(getClass()); - - final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 3), - createJobInfo(job.getConfiguration())); - - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - fut.get(); - - return null; - } - }, IgniteCheckedException.class, null); - } - - /** - * @param fileName File name. - * @param lineCnt Line count. - * @throws Exception If failed. - */ - private void prepareFile(String fileName, int lineCnt) throws Exception { - IgniteFs igfs = grid(0).fileSystem(igfsName); - - try (OutputStream os = igfs.create(new IgfsPath(fileName), true)) { - PrintWriter w = new PrintWriter(new OutputStreamWriter(os)); - - for (int i = 0; i < lineCnt; i++) - w.print("Hello, Hadoop map-reduce!\n"); - - w.flush(); - } - } - - /** - * Prepare job with mappers to cancel. - * @return Fully configured job. - * @throws Exception If fails. - */ - private Configuration prepareJobForCancelling() throws Exception { - prepareFile("/testFile", 1500); - - executedTasks.set(0); - cancelledTasks.set(0); - failMapperId.set(0); - splitsCount.set(0); - - Configuration cfg = new Configuration(); - - setupFileSystems(cfg); - - Job job = Job.getInstance(cfg); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - job.setMapperClass(CancellingTestMapper.class); - - job.setNumReduceTasks(0); - - job.setInputFormatClass(InFormat.class); - - FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/")); - FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/")); - - job.setJarByClass(getClass()); - - return job.getConfiguration(); - } - - /** - * Test input format. - */ - private static class InFormat extends TextInputFormat { - @Override public List<InputSplit> getSplits(JobContext ctx) throws IOException { - List<InputSplit> res = super.getSplits(ctx); - - splitsCount.set(res.size()); - - X.println("___ split of input: " + splitsCount.get()); - - return res; - } - } - - /** - * @throws Exception If failed. - */ - public void testTaskCancelling() throws Exception { - Configuration cfg = prepareJobForCancelling(); - - GridHadoopJobId jobId = new GridHadoopJobId(UUID.randomUUID(), 1); - - final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(cfg)); - - if (!GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return splitsCount.get() > 0; - } - }, 20000)) { - U.dumpThreads(log); - - assertTrue(false); - } - - if (!GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return executedTasks.get() == splitsCount.get(); - } - }, 20000)) { - U.dumpThreads(log); - - assertTrue(false); - } - - // Fail mapper with id "1", cancels others - failMapperId.set(1); - - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - fut.get(); - - return null; - } - }, IgniteCheckedException.class, null); - - assertEquals(executedTasks.get(), cancelledTasks.get() + 1); - } - - /** - * @throws Exception If failed. - */ - public void testJobKill() throws Exception { - Configuration cfg = prepareJobForCancelling(); - - GridHadoop hadoop = grid(0).hadoop(); - - GridHadoopJobId jobId = new GridHadoopJobId(UUID.randomUUID(), 1); - - //Kill unknown job. - boolean killRes = hadoop.kill(jobId); - - assertFalse(killRes); - - final IgniteInternalFuture<?> fut = hadoop.submit(jobId, createJobInfo(cfg)); - - if (!GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return splitsCount.get() > 0; - } - }, 20000)) { - U.dumpThreads(log); - - assertTrue(false); - } - - if (!GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - X.println("___ executed tasks: " + executedTasks.get()); - - return executedTasks.get() == splitsCount.get(); - } - }, 20000)) { - U.dumpThreads(log); - - fail(); - } - - //Kill really ran job. - killRes = hadoop.kill(jobId); - - assertTrue(killRes); - - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - fut.get(); - - return null; - } - }, IgniteCheckedException.class, null); - - assertEquals(executedTasks.get(), cancelledTasks.get()); - - //Kill the same job again. - killRes = hadoop.kill(jobId); - - assertTrue(killRes); - } - - private static class CancellingTestMapper extends Mapper<Object, Text, Text, IntWritable> { - private int mapperId; - - /** {@inheritDoc} */ - @Override protected void setup(Context ctx) throws IOException, InterruptedException { - mapperId = executedTasks.incrementAndGet(); - } - - /** {@inheritDoc} */ - @Override public void run(Context ctx) throws IOException, InterruptedException { - try { - super.run(ctx); - } - catch (HadoopTaskCancelledException e) { - cancelledTasks.incrementAndGet(); - - throw e; - } - } - - /** {@inheritDoc} */ - @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { - if (mapperId == failMapperId.get()) - throw new IOException(); - - Thread.sleep(1000); - } - } - - /** - * Test failing mapper. - */ - private static class FailMapper extends Mapper<Object, Text, Text, IntWritable> { - /** {@inheritDoc} */ - @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { - throw new IOException("Expected"); - } - } - - /** - * Mapper calculates number of lines. - */ - private static class TestMapper extends Mapper<Object, Text, Text, IntWritable> { - /** Writable integer constant of '1'. */ - private static final IntWritable ONE = new IntWritable(1); - - /** Line count constant. */ - public static final Text LINE_COUNT = new Text("lineCount"); - - /** {@inheritDoc} */ - @Override protected void setup(Context ctx) throws IOException, InterruptedException { - X.println("___ Mapper: " + ctx.getTaskAttemptID()); - - String taskId = ctx.getTaskAttemptID().toString(); - - LocalFileSystem locFs = FileSystem.getLocal(ctx.getConfiguration()); - - String workDir = locFs.getWorkingDirectory().toString(); - - assertNull(taskWorkDirs.put(workDir, taskId)); - } - - /** {@inheritDoc} */ - @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { - if (ctx.getConfiguration().getBoolean(MAP_WRITE, false)) - ctx.write(LINE_COUNT, ONE); - else - totalLineCnt.incrementAndGet(); - } - } - - /** - * Combiner calculates number of lines. - */ - private static class TestCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { - /** */ - IntWritable sum = new IntWritable(); - - /** {@inheritDoc} */ - @Override protected void setup(Context ctx) throws IOException, InterruptedException { - X.println("___ Combiner: "); - } - - /** {@inheritDoc} */ - @Override protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException, - InterruptedException { - int lineCnt = 0; - - for (IntWritable value : values) - lineCnt += value.get(); - - sum.set(lineCnt); - - X.println("___ combo: " + lineCnt); - - ctx.write(key, sum); - } - } - - /** - * Combiner calculates number of lines. - */ - private static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> { - /** */ - IntWritable sum = new IntWritable(); - - /** {@inheritDoc} */ - @Override protected void setup(Context ctx) throws IOException, InterruptedException { - X.println("___ Reducer: " + ctx.getTaskAttemptID()); - - String taskId = ctx.getTaskAttemptID().toString(); - String workDir = FileSystem.getLocal(ctx.getConfiguration()).getWorkingDirectory().toString(); - - assertNull(taskWorkDirs.put(workDir, taskId)); - } - - /** {@inheritDoc} */ - @Override protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException, - InterruptedException { - int lineCnt = 0; - - for (IntWritable value : values) { - lineCnt += value.get(); - - X.println("___ rdcr: " + value.get()); - } - - sum.set(lineCnt); - - ctx.write(key, sum); - - X.println("___ RDCR SUM: " + lineCnt); - - totalLineCnt.addAndGet(lineCnt); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java deleted file mode 100644 index a959472..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java +++ /dev/null @@ -1,259 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import com.google.common.base.*; -import org.apache.hadoop.io.*; -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.processors.hadoop.examples.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; - -import java.io.*; -import java.net.*; -import java.util.*; - -/** - * Tests of Map, Combine and Reduce task executions of any version of hadoop API. - */ -abstract class GridHadoopTasksAllVersionsTest extends GridHadoopAbstractWordCountTest { - /** Empty hosts array. */ - private static final String[] HOSTS = new String[0]; - - /** - * Creates some grid hadoop job. Override this method to create tests for any job implementation. - * - * @param inFile Input file name for the job. - * @param outFile Output file name for the job. - * @return Hadoop job. - * @throws IOException If fails. - */ - public abstract HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception; - - /** - * @return prefix of reducer output file name. It's "part-" for v1 and "part-r-" for v2 API - */ - public abstract String getOutputFileNamePrefix(); - - /** - * Tests map task execution. - * - * @throws Exception If fails. - */ - @SuppressWarnings("ConstantConditions") - public void testMapTask() throws Exception { - IgfsPath inDir = new IgfsPath(PATH_INPUT); - - igfs.mkdirs(inDir); - - IgfsPath inFile = new IgfsPath(inDir, GridHadoopWordCount2.class.getSimpleName() + "-input"); - - URI inFileUri = URI.create(igfsScheme() + inFile.toString()); - - try (PrintWriter pw = new PrintWriter(igfs.create(inFile, true))) { - pw.println("hello0 world0"); - pw.println("world1 hello1"); - } - - GridHadoopFileBlock fileBlock1 = new GridHadoopFileBlock(HOSTS, inFileUri, 0, igfs.info(inFile).length() - 1); - - try (PrintWriter pw = new PrintWriter(igfs.append(inFile, false))) { - pw.println("hello2 world2"); - pw.println("world3 hello3"); - } - GridHadoopFileBlock fileBlock2 = new GridHadoopFileBlock(HOSTS, inFileUri, fileBlock1.length(), - igfs.info(inFile).length() - fileBlock1.length()); - - HadoopV2Job gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT); - - GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(GridHadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock1); - - HadoopTestTaskContext ctx = new HadoopTestTaskContext(taskInfo, gridJob); - - ctx.mockOutput().clear(); - - ctx.run(); - - assertEquals("hello0,1; world0,1; world1,1; hello1,1", Joiner.on("; ").join(ctx.mockOutput())); - - ctx.mockOutput().clear(); - - ctx.taskInfo(new GridHadoopTaskInfo(GridHadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock2)); - - ctx.run(); - - assertEquals("hello2,1; world2,1; world3,1; hello3,1", Joiner.on("; ").join(ctx.mockOutput())); - } - - /** - * Generates input data for reduce-like operation into mock context input and runs the operation. - * - * @param gridJob Job is to create reduce task from. - * @param taskType Type of task - combine or reduce. - * @param taskNum Number of task in job. - * @param words Pairs of words and its counts. - * @return Context with mock output. - * @throws IgniteCheckedException If fails. - */ - private HadoopTestTaskContext runTaskWithInput(HadoopV2Job gridJob, GridHadoopTaskType taskType, - int taskNum, String... words) throws IgniteCheckedException { - GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(taskType, gridJob.id(), taskNum, 0, null); - - HadoopTestTaskContext ctx = new HadoopTestTaskContext(taskInfo, gridJob); - - for (int i = 0; i < words.length; i+=2) { - List<IntWritable> valList = new ArrayList<>(); - - for (int j = 0; j < Integer.parseInt(words[i + 1]); j++) - valList.add(new IntWritable(1)); - - ctx.mockInput().put(new Text(words[i]), valList); - } - - ctx.run(); - - return ctx; - } - - /** - * Tests reduce task execution. - * - * @throws Exception If fails. - */ - public void testReduceTask() throws Exception { - HadoopV2Job gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT); - - runTaskWithInput(gridJob, GridHadoopTaskType.REDUCE, 0, "word1", "5", "word2", "10"); - runTaskWithInput(gridJob, GridHadoopTaskType.REDUCE, 1, "word3", "7", "word4", "15"); - - assertEquals( - "word1\t5\n" + - "word2\t10\n", - readAndSortFile(PATH_OUTPUT + "/_temporary/0/task_00000000-0000-0000-0000-000000000000_0000_r_000000/" + - getOutputFileNamePrefix() + "00000") - ); - - assertEquals( - "word3\t7\n" + - "word4\t15\n", - readAndSortFile(PATH_OUTPUT + "/_temporary/0/task_00000000-0000-0000-0000-000000000000_0000_r_000001/" + - getOutputFileNamePrefix() + "00001") - ); - } - - /** - * Tests combine task execution. - * - * @throws Exception If fails. - */ - public void testCombinerTask() throws Exception { - HadoopV2Job gridJob = getHadoopJob("/", "/"); - - HadoopTestTaskContext ctx = - runTaskWithInput(gridJob, GridHadoopTaskType.COMBINE, 0, "word1", "5", "word2", "10"); - - assertEquals("word1,5; word2,10", Joiner.on("; ").join(ctx.mockOutput())); - - ctx = runTaskWithInput(gridJob, GridHadoopTaskType.COMBINE, 1, "word3", "7", "word4", "15"); - - assertEquals("word3,7; word4,15", Joiner.on("; ").join(ctx.mockOutput())); - } - - /** - * Runs chain of map-combine task on file block. - * - * @param fileBlock block of input file to be processed. - * @param gridJob Hadoop job implementation. - * @return Context of combine task with mock output. - * @throws IgniteCheckedException If fails. - */ - private HadoopTestTaskContext runMapCombineTask(GridHadoopFileBlock fileBlock, HadoopV2Job gridJob) - throws IgniteCheckedException { - GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(GridHadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock); - - HadoopTestTaskContext mapCtx = new HadoopTestTaskContext(taskInfo, gridJob); - - mapCtx.run(); - - //Prepare input for combine - taskInfo = new GridHadoopTaskInfo(GridHadoopTaskType.COMBINE, gridJob.id(), 0, 0, null); - - HadoopTestTaskContext combineCtx = new HadoopTestTaskContext(taskInfo, gridJob); - - combineCtx.makeTreeOfWritables(mapCtx.mockOutput()); - - combineCtx.run(); - - return combineCtx; - } - - /** - * Tests all job in complex. - * Runs 2 chains of map-combine tasks and sends result into one reduce task. - * - * @throws Exception If fails. - */ - @SuppressWarnings("ConstantConditions") - public void testAllTasks() throws Exception { - IgfsPath inDir = new IgfsPath(PATH_INPUT); - - igfs.mkdirs(inDir); - - IgfsPath inFile = new IgfsPath(inDir, GridHadoopWordCount2.class.getSimpleName() + "-input"); - - URI inFileUri = URI.create(igfsScheme() + inFile.toString()); - - generateTestFile(inFile.toString(), "red", 100, "blue", 200, "green", 150, "yellow", 70); - - //Split file into two blocks - long fileLen = igfs.info(inFile).length(); - - Long l = fileLen / 2; - - GridHadoopFileBlock fileBlock1 = new GridHadoopFileBlock(HOSTS, inFileUri, 0, l); - GridHadoopFileBlock fileBlock2 = new GridHadoopFileBlock(HOSTS, inFileUri, l, fileLen - l); - - HadoopV2Job gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT); - - HadoopTestTaskContext combine1Ctx = runMapCombineTask(fileBlock1, gridJob); - - HadoopTestTaskContext combine2Ctx = runMapCombineTask(fileBlock2, gridJob); - - //Prepare input for combine - GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(GridHadoopTaskType.REDUCE, gridJob.id(), 0, 0, null); - - HadoopTestTaskContext reduceCtx = new HadoopTestTaskContext(taskInfo, gridJob); - - reduceCtx.makeTreeOfWritables(combine1Ctx.mockOutput()); - reduceCtx.makeTreeOfWritables(combine2Ctx.mockOutput()); - - reduceCtx.run(); - - reduceCtx.taskInfo(new GridHadoopTaskInfo(GridHadoopTaskType.COMMIT, gridJob.id(), 0, 0, null)); - - reduceCtx.run(); - - assertEquals( - "blue\t200\n" + - "green\t150\n" + - "red\t100\n" + - "yellow\t70\n", - readAndSortFile(PATH_OUTPUT + "/" + getOutputFileNamePrefix() + "00000") - ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java deleted file mode 100644 index 679be71..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.mapred.*; -import org.apache.ignite.internal.processors.hadoop.examples.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; - -/** - * Tests of Map, Combine and Reduce task executions via running of job of hadoop API v1. - */ -public class GridHadoopTasksV1Test extends GridHadoopTasksAllVersionsTest { - /** - * Creates WordCount hadoop job for API v1. - * - * @param inFile Input file name for the job. - * @param outFile Output file name for the job. - * @return Hadoop job. - * @throws IOException If fails. - */ - @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception { - JobConf jobConf = GridHadoopWordCount1.getJob(inFile, outFile); - - setupFileSystems(jobConf); - - HadoopDefaultJobInfo jobInfo = createJobInfo(jobConf); - - GridHadoopJobId jobId = new GridHadoopJobId(new UUID(0, 0), 0); - - return new HadoopV2Job(jobId, jobInfo, log); - } - - /** {@inheritDoc} */ - @Override public String getOutputFileNamePrefix() { - return "part-"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java deleted file mode 100644 index 4d20b9c..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.*; -import org.apache.hadoop.mapreduce.lib.output.*; -import org.apache.ignite.internal.processors.hadoop.examples.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; - -import java.util.*; - -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; - -/** - * Tests of Map, Combine and Reduce task executions via running of job of hadoop API v2. - */ -public class GridHadoopTasksV2Test extends GridHadoopTasksAllVersionsTest { - /** - * Creates WordCount hadoop job for API v2. - * - * @param inFile Input file name for the job. - * @param outFile Output file name for the job. - * @return Hadoop job. - * @throws Exception if fails. - */ - @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception { - Job job = Job.getInstance(); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - GridHadoopWordCount2.setTasksClasses(job, true, true, true); - - Configuration conf = job.getConfiguration(); - - setupFileSystems(conf); - - FileInputFormat.setInputPaths(job, new Path(inFile)); - FileOutputFormat.setOutputPath(job, new Path(outFile)); - - job.setJarByClass(GridHadoopWordCount2.class); - - Job hadoopJob = GridHadoopWordCount2.getJob(inFile, outFile); - - HadoopDefaultJobInfo jobInfo = createJobInfo(hadoopJob.getConfiguration()); - - GridHadoopJobId jobId = new GridHadoopJobId(new UUID(0, 0), 0); - - return new HadoopV2Job(jobId, jobInfo, log); - } - - /** {@inheritDoc} */ - @Override public String getOutputFileNamePrefix() { - return "part-r-"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestRoundRobinMrPlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestRoundRobinMrPlanner.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestRoundRobinMrPlanner.java index f2b9981..a75605b 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestRoundRobinMrPlanner.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestRoundRobinMrPlanner.java @@ -29,7 +29,7 @@ import java.util.*; */ public class GridHadoopTestRoundRobinMrPlanner implements GridHadoopMapReducePlanner { /** {@inheritDoc} */ - @Override public GridHadoopMapReducePlan preparePlan(GridHadoopJob job, Collection<ClusterNode> top, + @Override public GridHadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top, @Nullable GridHadoopMapReducePlan oldPlan) throws IgniteCheckedException { if (top.isEmpty()) throw new IllegalArgumentException("Topology is empty"); @@ -37,12 +37,12 @@ public class GridHadoopTestRoundRobinMrPlanner implements GridHadoopMapReducePla // Has at least one element. Iterator<ClusterNode> it = top.iterator(); - Map<UUID, Collection<GridHadoopInputSplit>> mappers = new HashMap<>(); + Map<UUID, Collection<HadoopInputSplit>> mappers = new HashMap<>(); - for (GridHadoopInputSplit block : job.input()) { + for (HadoopInputSplit block : job.input()) { ClusterNode node = it.next(); - Collection<GridHadoopInputSplit> nodeBlocks = mappers.get(node.id()); + Collection<HadoopInputSplit> nodeBlocks = mappers.get(node.id()); if (nodeBlocks == null) { nodeBlocks = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopValidationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopValidationSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopValidationSelfTest.java deleted file mode 100644 index 051d073..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopValidationSelfTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.configuration.*; - -/** - * Configuration validation tests. - */ -public class GridHadoopValidationSelfTest extends GridHadoopAbstractSelfTest { - /** Peer class loading enabled flag. */ - public boolean peerClassLoading; - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(true); - - peerClassLoading = false; - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setPeerClassLoadingEnabled(peerClassLoading); - - return cfg; - } - - /** - * Ensure that Grid starts when all configuration parameters are valid. - * - * @throws Exception If failed. - */ - public void testValid() throws Exception { - startGrids(1); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java new file mode 100644 index 0000000..70bf0f2 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.conf.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem; +import org.apache.ignite.internal.processors.hadoop.fs.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * Abstract class for Hadoop tests. + */ +public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** REST port. */ + protected static final int REST_PORT = 11212; + + /** IGFS name. */ + protected static final String igfsName = null; + + /** IGFS name. */ + protected static final String igfsMetaCacheName = "meta"; + + /** IGFS name. */ + protected static final String igfsDataCacheName = "data"; + + /** IGFS block size. */ + protected static final int igfsBlockSize = 1024; + + /** IGFS block group size. */ + protected static final int igfsBlockGroupSize = 8; + + /** Initial REST port. */ + private int restPort = REST_PORT; + + /** Initial classpath. */ + private static String initCp; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + // Add surefire classpath to regular classpath. + initCp = System.getProperty("java.class.path"); + + String surefireCp = System.getProperty("surefire.test.class.path"); + + if (surefireCp != null) + System.setProperty("java.class.path", initCp + File.pathSeparatorChar + surefireCp); + + super.beforeTestsStarted(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + // Restore classpath. + System.setProperty("java.class.path", initCp); + + initCp = null; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setHadoopConfiguration(hadoopConfiguration(gridName)); + + TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); + + commSpi.setSharedMemoryPort(-1); + + cfg.setCommunicationSpi(commSpi); + + TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + if (igfsEnabled()) { + cfg.setCacheConfiguration(metaCacheConfiguration(), dataCacheConfiguration()); + + cfg.setIgfsConfiguration(igfsConfiguration()); + } + + if (restEnabled()) { + ConnectorConfiguration clnCfg = new ConnectorConfiguration(); + + clnCfg.setPort(restPort++); + + cfg.setConnectorConfiguration(clnCfg); + } + + cfg.setLocalHost("127.0.0.1"); + cfg.setPeerClassLoadingEnabled(false); + + return cfg; + } + + /** + * @param gridName Grid name. + * @return Hadoop configuration. + */ + public GridHadoopConfiguration hadoopConfiguration(String gridName) { + GridHadoopConfiguration cfg = new GridHadoopConfiguration(); + + cfg.setMaxParallelTasks(3); + + return cfg; + } + + /** + * @return IGFS configuration. + */ + public IgfsConfiguration igfsConfiguration() { + IgfsConfiguration cfg = new IgfsConfiguration(); + + cfg.setName(igfsName); + cfg.setBlockSize(igfsBlockSize); + cfg.setDataCacheName(igfsDataCacheName); + cfg.setMetaCacheName(igfsMetaCacheName); + cfg.setFragmentizerEnabled(false); + + return cfg; + } + + /** + * @return IGFS meta cache configuration. + */ + public CacheConfiguration metaCacheConfiguration() { + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setName(igfsMetaCacheName); + cfg.setCacheMode(REPLICATED); + cfg.setAtomicityMode(TRANSACTIONAL); + cfg.setWriteSynchronizationMode(FULL_SYNC); + + return cfg; + } + + /** + * @return IGFS data cache configuration. + */ + private CacheConfiguration dataCacheConfiguration() { + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setName(igfsDataCacheName); + cfg.setCacheMode(PARTITIONED); + cfg.setAtomicityMode(TRANSACTIONAL); + cfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(igfsBlockGroupSize)); + cfg.setWriteSynchronizationMode(FULL_SYNC); + + return cfg; + } + + /** + * @return {@code True} if IGFS is enabled on Hadoop nodes. + */ + protected boolean igfsEnabled() { + return false; + } + + /** + * @return {@code True} if REST is enabled on Hadoop nodes. + */ + protected boolean restEnabled() { + return false; + } + + /** + * @return Number of nodes to start. + */ + protected int gridCount() { + return 3; + } + + /** + * @param cfg Config. + */ + protected void setupFileSystems(Configuration cfg) { + cfg.set("fs.defaultFS", igfsScheme()); + cfg.set("fs.igfs.impl", org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.class.getName()); + cfg.set("fs.AbstractFileSystem.igfs.impl", IgniteHadoopFileSystem. + class.getName()); + + HadoopFileSystemsUtils.setupFileSystems(cfg); + } + + /** + * @return IGFS scheme for test. + */ + protected String igfsScheme() { + return "igfs://:" + getTestGridName(0) + "@/"; + } +}