http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobTrackerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobTrackerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobTrackerSelfTest.java deleted file mode 100644 index 3aa74d0..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobTrackerSelfTest.java +++ /dev/null @@ -1,330 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.*; -import org.apache.hadoop.mapreduce.lib.output.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*; - -/** - * Job tracker self test. - */ -public class GridHadoopJobTrackerSelfTest extends GridHadoopAbstractSelfTest { - /** */ - private static final String PATH_OUTPUT = "/test-out"; - - /** Test block count parameter name. */ - private static final int BLOCK_CNT = 10; - - /** */ - private static GridHadoopSharedMap m = GridHadoopSharedMap.map(GridHadoopJobTrackerSelfTest.class); - - /** Map task execution count. */ - private static final AtomicInteger mapExecCnt = m.put("mapExecCnt", new AtomicInteger()); - - /** Reduce task execution count. */ - private static final AtomicInteger reduceExecCnt = m.put("reduceExecCnt", new AtomicInteger()); - - /** Reduce task execution count. */ - private static final AtomicInteger combineExecCnt = m.put("combineExecCnt", new AtomicInteger()); - - /** */ - private static final Map<String, CountDownLatch> latch = m.put("latch", new HashMap<String, CountDownLatch>()); - - /** {@inheritDoc} */ - @Override protected boolean igfsEnabled() { - return true; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - startGrids(gridCount()); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - - super.afterTestsStopped(); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - latch.put("mapAwaitLatch", new CountDownLatch(1)); - latch.put("reduceAwaitLatch", new CountDownLatch(1)); - latch.put("combineAwaitLatch", new CountDownLatch(1)); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - mapExecCnt.set(0); - combineExecCnt.set(0); - reduceExecCnt.set(0); - } - - /** {@inheritDoc} */ - @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); - - cfg.setMapReducePlanner(new GridHadoopTestRoundRobinMrPlanner()); - cfg.setExternalExecution(false); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testSimpleTaskSubmit() throws Exception { - try { - UUID globalId = UUID.randomUUID(); - - Job job = Job.getInstance(); - setupFileSystems(job.getConfiguration()); - - job.setMapperClass(TestMapper.class); - job.setReducerClass(TestReducer.class); - job.setInputFormatClass(InFormat.class); - - FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "1")); - - GridHadoopJobId jobId = new GridHadoopJobId(globalId, 1); - - grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); - - checkStatus(jobId, false); - - info("Releasing map latch."); - - latch.get("mapAwaitLatch").countDown(); - - checkStatus(jobId, false); - - info("Releasing reduce latch."); - - latch.get("reduceAwaitLatch").countDown(); - - checkStatus(jobId, true); - - assertEquals(10, mapExecCnt.get()); - assertEquals(0, combineExecCnt.get()); - assertEquals(1, reduceExecCnt.get()); - } - finally { - // Safety. - latch.get("mapAwaitLatch").countDown(); - latch.get("combineAwaitLatch").countDown(); - latch.get("reduceAwaitLatch").countDown(); - } - } - - /** - * @throws Exception If failed. - */ - public void testTaskWithCombinerPerMap() throws Exception { - try { - UUID globalId = UUID.randomUUID(); - - Job job = Job.getInstance(); - setupFileSystems(job.getConfiguration()); - - job.setMapperClass(TestMapper.class); - job.setReducerClass(TestReducer.class); - job.setCombinerClass(TestCombiner.class); - job.setInputFormatClass(InFormat.class); - - FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "2")); - - GridHadoopJobId jobId = new GridHadoopJobId(globalId, 1); - - grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); - - checkStatus(jobId, false); - - info("Releasing map latch."); - - latch.get("mapAwaitLatch").countDown(); - - checkStatus(jobId, false); - - // All maps are completed. We have a combiner, so no reducers should be executed - // before combiner latch is released. - - U.sleep(50); - - assertEquals(0, reduceExecCnt.get()); - - info("Releasing combiner latch."); - - latch.get("combineAwaitLatch").countDown(); - - checkStatus(jobId, false); - - info("Releasing reduce latch."); - - latch.get("reduceAwaitLatch").countDown(); - - checkStatus(jobId, true); - - assertEquals(10, mapExecCnt.get()); - assertEquals(10, combineExecCnt.get()); - assertEquals(1, reduceExecCnt.get()); - } - finally { - // Safety. - latch.get("mapAwaitLatch").countDown(); - latch.get("combineAwaitLatch").countDown(); - latch.get("reduceAwaitLatch").countDown(); - } - } - - /** - * Checks job execution status. - * - * @param jobId Job ID. - * @param complete Completion status. - * @throws Exception If failed. - */ - private void checkStatus(GridHadoopJobId jobId, boolean complete) throws Exception { - for (int i = 0; i < gridCount(); i++) { - IgniteKernal kernal = (IgniteKernal)grid(i); - - GridHadoop hadoop = kernal.hadoop(); - - GridHadoopJobStatus stat = hadoop.status(jobId); - - assert stat != null; - - IgniteInternalFuture<?> fut = hadoop.finishFuture(jobId); - - if (!complete) - assertFalse(fut.isDone()); - else { - info("Waiting for status future completion on node [idx=" + i + ", nodeId=" + - kernal.getLocalNodeId() + ']'); - - fut.get(); - } - } - } - - /** - * Test input format - */ - public static class InFormat extends InputFormat { - - @Override public List<InputSplit> getSplits(JobContext ctx) throws IOException, InterruptedException { - List<InputSplit> res = new ArrayList<>(BLOCK_CNT); - - for (int i = 0; i < BLOCK_CNT; i++) - try { - res.add(new FileSplit(new Path(new URI("someFile")), i, i + 1, new String[] {"localhost"})); - } - catch (URISyntaxException e) { - throw new IOException(e); - } - - return res; - } - - @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext ctx) throws IOException, InterruptedException { - return new RecordReader() { - @Override public void initialize(InputSplit split, TaskAttemptContext ctx) { - } - - @Override public boolean nextKeyValue() { - return false; - } - - @Override public Object getCurrentKey() { - return null; - } - - @Override public Object getCurrentValue() { - return null; - } - - @Override public float getProgress() { - return 0; - } - - @Override public void close() { - - } - }; - } - } - - /** - * Test mapper. - */ - private static class TestMapper extends Mapper { - @Override public void run(Context ctx) throws IOException, InterruptedException { - System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId()); - - latch.get("mapAwaitLatch").await(); - - mapExecCnt.incrementAndGet(); - - System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId()); - } - } - - /** - * Test reducer. - */ - private static class TestReducer extends Reducer { - @Override public void run(Context ctx) throws IOException, InterruptedException { - System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId()); - - latch.get("reduceAwaitLatch").await(); - - reduceExecCnt.incrementAndGet(); - - System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId()); - } - } - - /** - * Test combiner. - */ - private static class TestCombiner extends Reducer { - @Override public void run(Context ctx) throws IOException, InterruptedException { - System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId()); - - latch.get("combineAwaitLatch").await(); - - combineExecCnt.incrementAndGet(); - - System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId()); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceEmbeddedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceEmbeddedSelfTest.java deleted file mode 100644 index dda041c..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceEmbeddedSelfTest.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.io.serializer.*; -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.hadoop.examples.*; - -import java.util.*; - -import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*; - -/** - * Tests map-reduce execution with embedded mode. - */ -public class GridHadoopMapReduceEmbeddedSelfTest extends GridHadoopMapReduceTest { - /** */ - private static Map<String, Boolean> flags = GridHadoopSharedMap.map(GridHadoopMapReduceEmbeddedSelfTest.class) - .put("flags", new HashMap<String, Boolean>()); - - /** {@inheritDoc} */ - @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); - - cfg.setExternalExecution(false); - - return cfg; - } - - /** - * Tests whole job execution with all phases in old and new versions of API with definition of custom - * Serialization, Partitioner and IO formats. - * @throws Exception If fails. - */ - public void testMultiReducerWholeMapReduceExecution() throws Exception { - IgfsPath inDir = new IgfsPath(PATH_INPUT); - - igfs.mkdirs(inDir); - - IgfsPath inFile = new IgfsPath(inDir, GridHadoopWordCount2.class.getSimpleName() + "-input"); - - generateTestFile(inFile.toString(), "key1", 10000, "key2", 20000, "key3", 15000, "key4", 7000, "key5", 12000, - "key6", 18000 ); - - for (int i = 0; i < 2; i++) { - boolean useNewAPI = i == 1; - - igfs.delete(new IgfsPath(PATH_OUTPUT), true); - - flags.put("serializationWasConfigured", false); - flags.put("partitionerWasConfigured", false); - flags.put("inputFormatWasConfigured", false); - flags.put("outputFormatWasConfigured", false); - - JobConf jobConf = new JobConf(); - - jobConf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName()); - - //To split into about 6-7 items for v2 - jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000); - - //For v1 - jobConf.setInt("fs.local.block.size", 65000); - - // File system coordinates. - setupFileSystems(jobConf); - - GridHadoopWordCount1.setTasksClasses(jobConf, !useNewAPI, !useNewAPI, !useNewAPI); - - if (!useNewAPI) { - jobConf.setPartitionerClass(CustomV1Partitioner.class); - jobConf.setInputFormat(CustomV1InputFormat.class); - jobConf.setOutputFormat(CustomV1OutputFormat.class); - } - - Job job = Job.getInstance(jobConf); - - GridHadoopWordCount2.setTasksClasses(job, useNewAPI, useNewAPI, useNewAPI); - - if (useNewAPI) { - job.setPartitionerClass(CustomV2Partitioner.class); - job.setInputFormatClass(CustomV2InputFormat.class); - job.setOutputFormatClass(CustomV2OutputFormat.class); - } - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString())); - FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT)); - - job.setNumReduceTasks(3); - - job.setJarByClass(GridHadoopWordCount2.class); - - IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 1), - createJobInfo(job.getConfiguration())); - - fut.get(); - - assertTrue("Serialization was configured (new API is " + useNewAPI + ")", - flags.get("serializationWasConfigured")); - - assertTrue("Partitioner was configured (new API is = " + useNewAPI + ")", - flags.get("partitionerWasConfigured")); - - assertTrue("Input format was configured (new API is = " + useNewAPI + ")", - flags.get("inputFormatWasConfigured")); - - assertTrue("Output format was configured (new API is = " + useNewAPI + ")", - flags.get("outputFormatWasConfigured")); - - assertEquals("Use new API = " + useNewAPI, - "key3\t15000\n" + - "key6\t18000\n", - readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00000") - ); - - assertEquals("Use new API = " + useNewAPI, - "key1\t10000\n" + - "key4\t7000\n", - readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00001") - ); - - assertEquals("Use new API = " + useNewAPI, - "key2\t20000\n" + - "key5\t12000\n", - readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00002") - ); - - } - } - - /** - * Custom serialization class that inherits behaviour of native {@link WritableSerialization}. - */ - protected static class CustomSerialization extends WritableSerialization { - @Override public void setConf(Configuration conf) { - super.setConf(conf); - - flags.put("serializationWasConfigured", true); - } - } - - /** - * Custom implementation of Partitioner in v1 API. - */ - private static class CustomV1Partitioner extends org.apache.hadoop.mapred.lib.HashPartitioner { - /** {@inheritDoc} */ - @Override public void configure(JobConf job) { - flags.put("partitionerWasConfigured", true); - } - } - - /** - * Custom implementation of Partitioner in v2 API. - */ - private static class CustomV2Partitioner extends org.apache.hadoop.mapreduce.lib.partition.HashPartitioner - implements Configurable { - /** {@inheritDoc} */ - @Override public void setConf(Configuration conf) { - flags.put("partitionerWasConfigured", true); - } - - /** {@inheritDoc} */ - @Override public Configuration getConf() { - return null; - } - } - - /** - * Custom implementation of InputFormat in v2 API. - */ - private static class CustomV2InputFormat extends org.apache.hadoop.mapreduce.lib.input.TextInputFormat implements Configurable { - /** {@inheritDoc} */ - @Override public void setConf(Configuration conf) { - flags.put("inputFormatWasConfigured", true); - } - - /** {@inheritDoc} */ - @Override public Configuration getConf() { - return null; - } - } - - /** - * Custom implementation of OutputFormat in v2 API. - */ - private static class CustomV2OutputFormat extends org.apache.hadoop.mapreduce.lib.output.TextOutputFormat implements Configurable { - /** {@inheritDoc} */ - @Override public void setConf(Configuration conf) { - flags.put("outputFormatWasConfigured", true); - } - - /** {@inheritDoc} */ - @Override public Configuration getConf() { - return null; - } - } - - /** - * Custom implementation of InputFormat in v1 API. - */ - private static class CustomV1InputFormat extends org.apache.hadoop.mapred.TextInputFormat { - /** {@inheritDoc} */ - @Override public void configure(JobConf job) { - super.configure(job); - - flags.put("inputFormatWasConfigured", true); - } - } - - /** - * Custom implementation of OutputFormat in v1 API. - */ - private static class CustomV1OutputFormat extends org.apache.hadoop.mapred.TextOutputFormat implements JobConfigurable { - /** {@inheritDoc} */ - @Override public void configure(JobConf job) { - flags.put("outputFormatWasConfigured", true); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceTest.java deleted file mode 100644 index 072e764..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceTest.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.hadoop.counter.*; -import org.apache.ignite.internal.processors.hadoop.examples.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.testframework.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*; - -/** - * Test of whole cycle of map-reduce processing via Job tracker. - */ -public class GridHadoopMapReduceTest extends GridHadoopAbstractWordCountTest { - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 3; - } - - /** - * Tests whole job execution with all phases in all combination of new and old versions of API. - * @throws Exception If fails. - */ - public void testWholeMapReduceExecution() throws Exception { - IgfsPath inDir = new IgfsPath(PATH_INPUT); - - igfs.mkdirs(inDir); - - IgfsPath inFile = new IgfsPath(inDir, GridHadoopWordCount2.class.getSimpleName() + "-input"); - - generateTestFile(inFile.toString(), "red", 100000, "blue", 200000, "green", 150000, "yellow", 70000 ); - - for (int i = 0; i < 8; i++) { - igfs.delete(new IgfsPath(PATH_OUTPUT), true); - - boolean useNewMapper = (i & 1) == 0; - boolean useNewCombiner = (i & 2) == 0; - boolean useNewReducer = (i & 4) == 0; - - JobConf jobConf = new JobConf(); - - jobConf.set(JOB_COUNTER_WRITER_PROPERTY, GridHadoopFSCounterWriter.class.getName()); - jobConf.setUser("yyy"); - jobConf.set(GridHadoopFSCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz"); - - //To split into about 40 items for v2 - jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000); - - //For v1 - jobConf.setInt("fs.local.block.size", 65000); - - // File system coordinates. - setupFileSystems(jobConf); - - GridHadoopWordCount1.setTasksClasses(jobConf, !useNewMapper, !useNewCombiner, !useNewReducer); - - Job job = Job.getInstance(jobConf); - - GridHadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString())); - FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT)); - - job.setJarByClass(GridHadoopWordCount2.class); - - GridHadoopJobId jobId = new GridHadoopJobId(UUID.randomUUID(), 1); - - IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); - - fut.get(); - - checkJobStatistics(jobId); - - assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " + - useNewReducer, - "blue\t200000\n" + - "green\t150000\n" + - "red\t100000\n" + - "yellow\t70000\n", - readAndSortFile(PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000") - ); - } - } - - /** - * Simple test job statistics. - * - * @param jobId Job id. - * @throws IgniteCheckedException - */ - private void checkJobStatistics(GridHadoopJobId jobId) throws IgniteCheckedException, IOException { - GridHadoopCounters cntrs = grid(0).hadoop().counters(jobId); - - GridHadoopPerformanceCounter perfCntr = GridHadoopPerformanceCounter.getCounter(cntrs, null); - - Map<String, SortedMap<Integer,Long>> tasks = new TreeMap<>(); - - Map<String, Integer> phaseOrders = new HashMap<>(); - phaseOrders.put("submit", 0); - phaseOrders.put("prepare", 1); - phaseOrders.put("start", 2); - phaseOrders.put("Cstart", 3); - phaseOrders.put("finish", 4); - - String prevTaskId = null; - - long apiEvtCnt = 0; - - for (T2<String, Long> evt : perfCntr.evts()) { - //We expect string pattern: COMBINE 1 run 7fa86a14-5a08-40e3-a7cb-98109b52a706 - String[] parsedEvt = evt.get1().split(" "); - - String taskId; - String taskPhase; - - if ("JOB".equals(parsedEvt[0])) { - taskId = parsedEvt[0]; - taskPhase = parsedEvt[1]; - } - else { - taskId = ("COMBINE".equals(parsedEvt[0]) ? "MAP" : parsedEvt[0].substring(0, 3)) + parsedEvt[1]; - taskPhase = ("COMBINE".equals(parsedEvt[0]) ? "C" : "") + parsedEvt[2]; - } - - if (!taskId.equals(prevTaskId)) - tasks.put(taskId, new TreeMap<Integer,Long>()); - - Integer pos = phaseOrders.get(taskPhase); - - assertNotNull("Invalid phase " + taskPhase, pos); - - tasks.get(taskId).put(pos, evt.get2()); - - prevTaskId = taskId; - - apiEvtCnt++; - } - - for (Map.Entry<String ,SortedMap<Integer,Long>> task : tasks.entrySet()) { - Map<Integer, Long> order = task.getValue(); - - long prev = 0; - - for (Map.Entry<Integer, Long> phase : order.entrySet()) { - assertTrue("Phase order of " + task.getKey() + " is invalid", phase.getValue() >= prev); - - prev = phase.getValue(); - } - } - - final IgfsPath statPath = new IgfsPath("/xxx/yyy/zzz/" + jobId + "/performance"); - - GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return igfs.exists(statPath); - } - }, 10000); - - BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath))); - - assertEquals(apiEvtCnt, GridHadoopTestUtils.simpleCheckJobStatFile(reader)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPopularWordsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPopularWordsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPopularWordsTest.java deleted file mode 100644 index 3e8a95a..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPopularWordsTest.java +++ /dev/null @@ -1,294 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import com.google.common.collect.*; -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.*; -import org.apache.hadoop.mapreduce.lib.output.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.*; -import java.util.Map.*; - -import static com.google.common.collect.Maps.*; -import static com.google.common.collect.MinMaxPriorityQueue.*; -import static java.util.Collections.*; - -/** - * Hadoop-based 10 popular words example: all files in a given directory are tokenized and for each word longer than - * 3 characters the number of occurrences ins calculated. Finally, 10 words with the highest occurrence count are - * output. - * - * NOTE: in order to run this example on Windows please ensure that cygwin is installed and available in the system - * path. - */ -public class GridHadoopPopularWordsTest { - /** Ignite home. */ - private static final String IGNITE_HOME = U.getIgniteHome(); - - /** The path to the input directory. ALl files in that directory will be processed. */ - private static final Path BOOKS_LOCAL_DIR = - new Path("file:" + IGNITE_HOME, "modules/tests/java/org/apache/ignite/grid/hadoop/books"); - - /** The path to the output directory. THe result file will be written to this location. */ - private static final Path RESULT_LOCAL_DIR = - new Path("file:" + IGNITE_HOME, "modules/tests/java/org/apache/ignite/grid/hadoop/output"); - - /** Popular books source dir in DFS. */ - private static final Path BOOKS_DFS_DIR = new Path("tmp/word-count-example/in"); - - /** Popular books source dir in DFS. */ - private static final Path RESULT_DFS_DIR = new Path("tmp/word-count-example/out"); - - /** Path to the distributed file system configuration. */ - private static final String DFS_CFG = "examples/config/filesystem/core-site.xml"; - - /** Top N words to select **/ - private static final int POPULAR_WORDS_CNT = 10; - - /** - * For each token in the input string the mapper emits a {word, 1} pair. - */ - private static class TokenizingMapper extends Mapper<LongWritable, Text, Text, IntWritable> { - /** Constant value. */ - private static final IntWritable ONE = new IntWritable(1); - - /** The word converted into the Text. */ - private Text word = new Text(); - - /** - * Emits a entry where the key is the word and the value is always 1. - * - * @param key the current position in the input file (not used here) - * @param val the text string - * @param ctx mapper context - * @throws IOException - * @throws InterruptedException - */ - @Override protected void map(LongWritable key, Text val, Context ctx) - throws IOException, InterruptedException { - // Get the mapped object. - final String line = val.toString(); - - // Splits the given string to words. - final String[] words = line.split("[^a-zA-Z0-9]"); - - for (final String w : words) { - // Only emit counts for longer words. - if (w.length() <= 3) - continue; - - word.set(w); - - // Write the word into the context with the initial count equals 1. - ctx.write(word, ONE); - } - } - } - - /** - * The reducer uses a priority queue to rank the words based on its number of occurrences. - */ - private static class TopNWordsReducer extends Reducer<Text, IntWritable, Text, IntWritable> { - private MinMaxPriorityQueue<Entry<Integer, String>> q; - - TopNWordsReducer() { - q = orderedBy(reverseOrder(new Comparator<Entry<Integer, String>>() { - @Override public int compare(Entry<Integer, String> o1, Entry<Integer, String> o2) { - return o1.getKey().compareTo(o2.getKey()); - } - })).expectedSize(POPULAR_WORDS_CNT).maximumSize(POPULAR_WORDS_CNT).create(); - } - - /** - * This method doesn't emit anything, but just keeps track of the top N words. - * - * @param key The word. - * @param vals The words counts. - * @param ctx Reducer context. - * @throws IOException If failed. - * @throws InterruptedException If failed. - */ - @Override public void reduce(Text key, Iterable<IntWritable> vals, Context ctx) throws IOException, - InterruptedException { - int sum = 0; - - for (IntWritable val : vals) - sum += val.get(); - - q.add(immutableEntry(sum, key.toString())); - } - - /** - * This method is called after all the word entries have been processed. It writes the accumulated - * statistics to the job output file. - * - * @param ctx The job context. - * @throws IOException If failed. - * @throws InterruptedException If failed. - */ - @Override protected void cleanup(Context ctx) throws IOException, InterruptedException { - IntWritable i = new IntWritable(); - - Text txt = new Text(); - - // iterate in desc order - while (!q.isEmpty()) { - Entry<Integer, String> e = q.removeFirst(); - - i.set(e.getKey()); - - txt.set(e.getValue()); - - ctx.write(txt, i); - } - } - } - - /** - * Configures the Hadoop MapReduce job. - * - * @return Instance of the Hadoop MapRed job. - * @throws IOException If failed. - */ - private Job createConfigBasedHadoopJob() throws IOException { - Job jobCfg = new Job(); - - Configuration cfg = jobCfg.getConfiguration(); - - // Use explicit configuration of distributed file system, if provided. - if (DFS_CFG != null) - cfg.addResource(U.resolveIgniteUrl(DFS_CFG)); - - jobCfg.setJobName("HadoopPopularWordExample"); - jobCfg.setJarByClass(GridHadoopPopularWordsTest.class); - jobCfg.setInputFormatClass(TextInputFormat.class); - jobCfg.setOutputKeyClass(Text.class); - jobCfg.setOutputValueClass(IntWritable.class); - jobCfg.setMapperClass(TokenizingMapper.class); - jobCfg.setReducerClass(TopNWordsReducer.class); - - FileInputFormat.setInputPaths(jobCfg, BOOKS_DFS_DIR); - FileOutputFormat.setOutputPath(jobCfg, RESULT_DFS_DIR); - - // Local job tracker allows the only task per wave, but text input format - // replaces it with the calculated value based on input split size option. - if ("local".equals(cfg.get("mapred.job.tracker", "local"))) { - // Split job into tasks using 32MB split size. - FileInputFormat.setMinInputSplitSize(jobCfg, 32 * 1024 * 1024); - FileInputFormat.setMaxInputSplitSize(jobCfg, Long.MAX_VALUE); - } - - return jobCfg; - } - - /** - * Runs the Hadoop job. - * - * @return {@code True} if succeeded, {@code false} otherwise. - * @throws Exception If failed. - */ - private boolean runWordCountConfigBasedHadoopJob() throws Exception { - Job job = createConfigBasedHadoopJob(); - - // Distributed file system this job will work with. - FileSystem fs = FileSystem.get(job.getConfiguration()); - - X.println(">>> Using distributed file system: " + fs.getHomeDirectory()); - - // Prepare input and output job directories. - prepareDirectories(fs); - - long time = System.currentTimeMillis(); - - // Run job. - boolean res = job.waitForCompletion(true); - - X.println(">>> Job execution time: " + (System.currentTimeMillis() - time) / 1000 + " sec."); - - // Move job results into local file system, so you can view calculated results. - publishResults(fs); - - return res; - } - - /** - * Prepare job's data: cleanup result directories that might have left over - * after previous runs, copy input files from the local file system into DFS. - * - * @param fs Distributed file system to use in job. - * @throws IOException If failed. - */ - private void prepareDirectories(FileSystem fs) throws IOException { - X.println(">>> Cleaning up DFS result directory: " + RESULT_DFS_DIR); - - fs.delete(RESULT_DFS_DIR, true); - - X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR); - - fs.delete(BOOKS_DFS_DIR, true); - - X.println(">>> Copy local files into DFS input directory: " + BOOKS_DFS_DIR); - - fs.copyFromLocalFile(BOOKS_LOCAL_DIR, BOOKS_DFS_DIR); - } - - /** - * Publish job execution results into local file system, so you can view them. - * - * @param fs Distributed file sytem used in job. - * @throws IOException If failed. - */ - private void publishResults(FileSystem fs) throws IOException { - X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR); - - fs.delete(BOOKS_DFS_DIR, true); - - X.println(">>> Cleaning up LOCAL result directory: " + RESULT_LOCAL_DIR); - - fs.delete(RESULT_LOCAL_DIR, true); - - X.println(">>> Moving job results into LOCAL result directory: " + RESULT_LOCAL_DIR); - - fs.copyToLocalFile(true, RESULT_DFS_DIR, RESULT_LOCAL_DIR); - } - - /** - * Executes a modified version of the Hadoop word count example. Here, in addition to counting the number of - * occurrences of the word in the source files, the N most popular words are selected. - * - * @param args None. - */ - public static void main(String[] args) { - try { - new GridHadoopPopularWordsTest().runWordCountConfigBasedHadoopJob(); - } - catch (Exception e) { - X.println(">>> Failed to run word count example: " + e.getMessage()); - } - - System.exit(0); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSerializationWrapperSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSerializationWrapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSerializationWrapperSelfTest.java deleted file mode 100644 index 79b9965..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSerializationWrapperSelfTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.io.*; -import org.apache.hadoop.io.serializer.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.io.*; -import java.util.*; - -/** - * Test of wrapper of the native serialization. - */ -public class GridHadoopSerializationWrapperSelfTest extends GridCommonAbstractTest { - /** - * Tests read/write of IntWritable via native WritableSerialization. - * @throws Exception If fails. - */ - public void testIntWritableSerialization() throws Exception { - GridHadoopSerialization ser = new GridHadoopSerializationWrapper(new WritableSerialization(), IntWritable.class); - - ByteArrayOutputStream buf = new ByteArrayOutputStream(); - - DataOutput out = new DataOutputStream(buf); - - ser.write(out, new IntWritable(3)); - ser.write(out, new IntWritable(-5)); - - assertEquals("[0, 0, 0, 3, -1, -1, -1, -5]", Arrays.toString(buf.toByteArray())); - - DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray())); - - assertEquals(3, ((IntWritable)ser.read(in, null)).get()); - assertEquals(-5, ((IntWritable)ser.read(in, null)).get()); - } - - /** - * Tests read/write of Integer via native JavaleSerialization. - * @throws Exception If fails. - */ - public void testIntJavaSerialization() throws Exception { - GridHadoopSerialization ser = new GridHadoopSerializationWrapper(new JavaSerialization(), Integer.class); - - ByteArrayOutputStream buf = new ByteArrayOutputStream(); - - DataOutput out = new DataOutputStream(buf); - - ser.write(out, 3); - ser.write(out, -5); - ser.close(); - - DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray())); - - assertEquals(3, ((Integer)ser.read(in, null)).intValue()); - assertEquals(-5, ((Integer)ser.read(in, null)).intValue()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSharedMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSharedMap.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSharedMap.java deleted file mode 100644 index 689fb58..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSharedMap.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.jdk8.backport.*; - -import java.util.concurrent.*; - -/** - * For tests. - */ -public class GridHadoopSharedMap { - /** */ - private static final ConcurrentMap<String, GridHadoopSharedMap> maps = new ConcurrentHashMap8<>(); - - /** */ - private final ConcurrentMap<String, Object> map = new ConcurrentHashMap8<>(); - - /** - * Private. - */ - private GridHadoopSharedMap() { - // No-op. - } - - /** - * Puts object by key. - * - * @param key Key. - * @param val Value. - */ - public <T> T put(String key, T val) { - Object old = map.putIfAbsent(key, val); - - return old == null ? val : (T)old; - } - - /** - * @param cls Class. - * @return Map of static fields. - */ - public static GridHadoopSharedMap map(Class<?> cls) { - GridHadoopSharedMap m = maps.get(cls.getName()); - - if (m != null) - return m; - - GridHadoopSharedMap old = maps.putIfAbsent(cls.getName(), m = new GridHadoopSharedMap()); - - return old == null ? m : old; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingExternalTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingExternalTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingExternalTest.java deleted file mode 100644 index 23884ef..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingExternalTest.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -/** - * External test for sorting. - */ -public class GridHadoopSortingExternalTest extends GridHadoopSortingTest { - /** {@inheritDoc} */ - @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); - - cfg.setExternalExecution(true); - - return cfg; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingTest.java deleted file mode 100644 index 3a2c397..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingTest.java +++ /dev/null @@ -1,281 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.io.serializer.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.*; -import org.apache.hadoop.mapreduce.lib.output.*; -import org.apache.ignite.internal.util.typedef.*; - -import java.io.*; -import java.net.*; -import java.util.*; - -import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*; - -/** - * Tests correct sorting. - */ -public class GridHadoopSortingTest extends GridHadoopAbstractSelfTest { - /** */ - private static final String PATH_INPUT = "/test-in"; - - /** */ - private static final String PATH_OUTPUT = "/test-out"; - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 3; - } - - /** - * @return {@code True} if IGFS is enabled on Hadoop nodes. - */ - @Override protected boolean igfsEnabled() { - return true; - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - startGrids(gridCount()); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(true); - } - - /** {@inheritDoc} */ - @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); - - cfg.setExternalExecution(false); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testSortSimple() throws Exception { - // Generate test data. - Job job = Job.getInstance(); - - job.setInputFormatClass(InFormat.class); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(NullWritable.class); - - job.setMapperClass(Mapper.class); - job.setNumReduceTasks(0); - - setupFileSystems(job.getConfiguration()); - - FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_INPUT)); - - X.printerrln("Data generation started."); - - grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 1), - createJobInfo(job.getConfiguration())).get(180000); - - X.printerrln("Data generation complete."); - - // Run main map-reduce job. - job = Job.getInstance(); - - setupFileSystems(job.getConfiguration()); - - job.getConfiguration().set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, JavaSerialization.class.getName() + - "," + WritableSerialization.class.getName()); - - FileInputFormat.setInputPaths(job, new Path(igfsScheme() + PATH_INPUT)); - FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT)); - - job.setSortComparatorClass(JavaSerializationComparator.class); - - job.setMapperClass(MyMapper.class); - job.setReducerClass(MyReducer.class); - - job.setNumReduceTasks(2); - - job.setMapOutputKeyClass(UUID.class); - job.setMapOutputValueClass(NullWritable.class); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(NullWritable.class); - - X.printerrln("Job started."); - - grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 2), - createJobInfo(job.getConfiguration())).get(180000); - - X.printerrln("Job complete."); - - // Check result. - Path outDir = new Path(igfsScheme() + PATH_OUTPUT); - - AbstractFileSystem fs = AbstractFileSystem.get(new URI(igfsScheme()), job.getConfiguration()); - - for (FileStatus file : fs.listStatus(outDir)) { - X.printerrln("__ file: " + file); - - if (file.getLen() == 0) - continue; - - FSDataInputStream in = fs.open(file.getPath()); - - Scanner sc = new Scanner(in); - - UUID prev = null; - - while(sc.hasNextLine()) { - UUID next = UUID.fromString(sc.nextLine()); - -// X.printerrln("___ check: " + next); - - if (prev != null) - assertTrue(prev.compareTo(next) < 0); - - prev = next; - } - } - } - - public static class InFormat extends InputFormat<Text, NullWritable> { - /** {@inheritDoc} */ - @Override public List<InputSplit> getSplits(JobContext ctx) throws IOException, InterruptedException { - List<InputSplit> res = new ArrayList<>(); - - FakeSplit split = new FakeSplit(20); - - for (int i = 0; i < 10; i++) - res.add(split); - - return res; - } - - /** {@inheritDoc} */ - @Override public RecordReader<Text, NullWritable> createRecordReader(final InputSplit split, - TaskAttemptContext ctx) throws IOException, InterruptedException { - return new RecordReader<Text, NullWritable>() { - /** */ - int cnt; - - /** */ - Text txt = new Text(); - - @Override public void initialize(InputSplit split, TaskAttemptContext ctx) { - // No-op. - } - - @Override public boolean nextKeyValue() throws IOException, InterruptedException { - return ++cnt <= split.getLength(); - } - - @Override public Text getCurrentKey() { - txt.set(UUID.randomUUID().toString()); - -// X.printerrln("___ read: " + txt); - - return txt; - } - - @Override public NullWritable getCurrentValue() { - return NullWritable.get(); - } - - @Override public float getProgress() throws IOException, InterruptedException { - return (float)cnt / split.getLength(); - } - - @Override public void close() { - // No-op. - } - }; - } - } - - public static class MyMapper extends Mapper<LongWritable, Text, UUID, NullWritable> { - /** {@inheritDoc} */ - @Override protected void map(LongWritable key, Text val, Context ctx) throws IOException, InterruptedException { -// X.printerrln("___ map: " + val); - - ctx.write(UUID.fromString(val.toString()), NullWritable.get()); - } - } - - public static class MyReducer extends Reducer<UUID, NullWritable, Text, NullWritable> { - /** */ - private Text text = new Text(); - - /** {@inheritDoc} */ - @Override protected void reduce(UUID key, Iterable<NullWritable> vals, Context ctx) - throws IOException, InterruptedException { -// X.printerrln("___ rdc: " + key); - - text.set(key.toString()); - - ctx.write(text, NullWritable.get()); - } - } - - public static class FakeSplit extends InputSplit implements Writable { - /** */ - private static final String[] HOSTS = {"127.0.0.1"}; - - /** */ - private int len; - - /** - * @param len Length. - */ - public FakeSplit(int len) { - this.len = len; - } - - /** - * - */ - public FakeSplit() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public long getLength() throws IOException, InterruptedException { - return len; - } - - /** {@inheritDoc} */ - @Override public String[] getLocations() throws IOException, InterruptedException { - return HOSTS; - } - - /** {@inheritDoc} */ - @Override public void write(DataOutput out) throws IOException { - out.writeInt(len); - } - - /** {@inheritDoc} */ - @Override public void readFields(DataInput in) throws IOException { - len = in.readInt(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSplitWrapperSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSplitWrapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSplitWrapperSelfTest.java deleted file mode 100644 index 0b15a2c..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSplitWrapperSelfTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.mapreduce.lib.input.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; -import org.apache.ignite.testframework.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -/** - * Self test of {@link GridHadoopSplitWrapper}. - */ -public class GridHadoopSplitWrapperSelfTest extends GridHadoopAbstractSelfTest { - /** - * Tests serialization of wrapper and the wrapped native split. - * @throws Exception If fails. - */ - public void testSerialization() throws Exception { - FileSplit nativeSplit = new FileSplit(new Path("/path/to/file"), 100, 500, new String[]{"host1", "host2"}); - - assertEquals("/path/to/file:100+500", nativeSplit.toString()); - - GridHadoopSplitWrapper split = GridHadoopUtils.wrapSplit(10, nativeSplit, nativeSplit.getLocations()); - - assertEquals("[host1, host2]", Arrays.toString(split.hosts())); - - ByteArrayOutputStream buf = new ByteArrayOutputStream(); - - ObjectOutput out = new ObjectOutputStream(buf); - - out.writeObject(split); - - ObjectInput in = new ObjectInputStream(new ByteArrayInputStream(buf.toByteArray())); - - final GridHadoopSplitWrapper res = (GridHadoopSplitWrapper)in.readObject(); - - assertEquals("/path/to/file:100+500", GridHadoopUtils.unwrapSplit(res).toString()); - - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - res.hosts(); - - return null; - } - }, AssertionError.class, null); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopStartup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopStartup.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopStartup.java deleted file mode 100644 index 6cc7635..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopStartup.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.conf.*; -import org.apache.ignite.*; -import org.apache.ignite.igfs.hadoop.v2.IgfsHadoopFileSystem; -import org.apache.ignite.internal.util.typedef.*; - -/** - * Hadoop node startup. - */ -public class GridHadoopStartup { - /** - * @param args Arguments. - */ - public static void main(String[] args) { - G.start("config/hadoop/default-config.xml"); - } - - /** - * @return Configuration for job run. - */ - @SuppressWarnings("UnnecessaryFullyQualifiedName") - public static Configuration configuration() { - Configuration cfg = new Configuration(); - - cfg.set("fs.defaultFS", "igfs://igfs@localhost"); - - cfg.set("fs.igfs.impl", org.apache.ignite.igfs.hadoop.v1.IgfsHadoopFileSystem.class.getName()); - cfg.set("fs.AbstractFileSystem.igfs.impl", IgfsHadoopFileSystem.class.getName()); - - cfg.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER"); - - cfg.set("mapreduce.framework.name", "ignite"); - cfg.set("mapreduce.jobtracker.address", "localhost:11211"); - - return cfg; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskExecutionSelfTest.java deleted file mode 100644 index 40546bb..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskExecutionSelfTest.java +++ /dev/null @@ -1,551 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.*; -import org.apache.hadoop.mapreduce.lib.output.*; -import org.apache.ignite.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.igfs.hadoop.v1.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.testframework.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*; - -/** - * Tests map-reduce task execution basics. - */ -public class GridHadoopTaskExecutionSelfTest extends GridHadoopAbstractSelfTest { - /** */ - private static GridHadoopSharedMap m = GridHadoopSharedMap.map(GridHadoopTaskExecutionSelfTest.class); - - /** Line count. */ - private static final AtomicInteger totalLineCnt = m.put("totalLineCnt", new AtomicInteger()); - - /** Executed tasks. */ - private static final AtomicInteger executedTasks = m.put("executedTasks", new AtomicInteger()); - - /** Cancelled tasks. */ - private static final AtomicInteger cancelledTasks = m.put("cancelledTasks", new AtomicInteger()); - - /** Working directory of each task. */ - private static final Map<String, String> taskWorkDirs = m.put("taskWorkDirs", - new ConcurrentHashMap<String, String>()); - - /** Mapper id to fail. */ - private static final AtomicInteger failMapperId = m.put("failMapperId", new AtomicInteger()); - - /** Number of splits of the current input. */ - private static final AtomicInteger splitsCount = m.put("splitsCount", new AtomicInteger()); - - /** Test param. */ - private static final String MAP_WRITE = "test.map.write"; - - - /** {@inheritDoc} */ - @Override public IgfsConfiguration igfsConfiguration() { - IgfsConfiguration cfg = super.igfsConfiguration(); - - cfg.setFragmentizerEnabled(false); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected boolean igfsEnabled() { - return true; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - startGrids(gridCount()); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - - super.afterTestsStopped(); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - grid(0).fileSystem(igfsName).format(); - } - - /** {@inheritDoc} */ - @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); - - cfg.setMaxParallelTasks(5); - cfg.setExternalExecution(false); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testMapRun() throws Exception { - int lineCnt = 10000; - String fileName = "/testFile"; - - prepareFile(fileName, lineCnt); - - totalLineCnt.set(0); - taskWorkDirs.clear(); - - Configuration cfg = new Configuration(); - - cfg.setStrings("fs.igfs.impl", IgfsHadoopFileSystem.class.getName()); - - Job job = Job.getInstance(cfg); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - job.setMapperClass(TestMapper.class); - - job.setNumReduceTasks(0); - - job.setInputFormatClass(TextInputFormat.class); - - FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/")); - FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/")); - - job.setJarByClass(getClass()); - - IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 1), - createJobInfo(job.getConfiguration())); - - fut.get(); - - assertEquals(lineCnt, totalLineCnt.get()); - - assertEquals(32, taskWorkDirs.size()); - } - - /** - * @throws Exception If failed. - */ - public void testMapCombineRun() throws Exception { - int lineCnt = 10001; - String fileName = "/testFile"; - - prepareFile(fileName, lineCnt); - - totalLineCnt.set(0); - taskWorkDirs.clear(); - - Configuration cfg = new Configuration(); - - cfg.setStrings("fs.igfs.impl", IgfsHadoopFileSystem.class.getName()); - cfg.setBoolean(MAP_WRITE, true); - - Job job = Job.getInstance(cfg); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - job.setMapperClass(TestMapper.class); - job.setCombinerClass(TestCombiner.class); - job.setReducerClass(TestReducer.class); - - job.setNumReduceTasks(2); - - job.setInputFormatClass(TextInputFormat.class); - - FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/")); - FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output")); - - job.setJarByClass(getClass()); - - GridHadoopJobId jobId = new GridHadoopJobId(UUID.randomUUID(), 2); - - IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); - - fut.get(); - - assertEquals(lineCnt, totalLineCnt.get()); - - assertEquals(34, taskWorkDirs.size()); - - for (int g = 0; g < gridCount(); g++) - grid(g).hadoop().finishFuture(jobId).get(); - } - - /** - * @throws Exception If failed. - */ - public void testMapperException() throws Exception { - prepareFile("/testFile", 1000); - - Configuration cfg = new Configuration(); - - cfg.setStrings("fs.igfs.impl", IgfsHadoopFileSystem.class.getName()); - - Job job = Job.getInstance(cfg); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - job.setMapperClass(FailMapper.class); - - job.setNumReduceTasks(0); - - job.setInputFormatClass(TextInputFormat.class); - - FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/")); - FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/")); - - job.setJarByClass(getClass()); - - final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 3), - createJobInfo(job.getConfiguration())); - - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - fut.get(); - - return null; - } - }, IgniteCheckedException.class, null); - } - - /** - * @param fileName File name. - * @param lineCnt Line count. - * @throws Exception If failed. - */ - private void prepareFile(String fileName, int lineCnt) throws Exception { - IgniteFs igfs = grid(0).fileSystem(igfsName); - - try (OutputStream os = igfs.create(new IgfsPath(fileName), true)) { - PrintWriter w = new PrintWriter(new OutputStreamWriter(os)); - - for (int i = 0; i < lineCnt; i++) - w.print("Hello, Hadoop map-reduce!\n"); - - w.flush(); - } - } - - /** - * Prepare job with mappers to cancel. - * @return Fully configured job. - * @throws Exception If fails. - */ - private Configuration prepareJobForCancelling() throws Exception { - prepareFile("/testFile", 1500); - - executedTasks.set(0); - cancelledTasks.set(0); - failMapperId.set(0); - splitsCount.set(0); - - Configuration cfg = new Configuration(); - - setupFileSystems(cfg); - - Job job = Job.getInstance(cfg); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - job.setMapperClass(CancellingTestMapper.class); - - job.setNumReduceTasks(0); - - job.setInputFormatClass(InFormat.class); - - FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/")); - FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/")); - - job.setJarByClass(getClass()); - - return job.getConfiguration(); - } - - /** - * Test input format. - */ - private static class InFormat extends TextInputFormat { - @Override public List<InputSplit> getSplits(JobContext ctx) throws IOException { - List<InputSplit> res = super.getSplits(ctx); - - splitsCount.set(res.size()); - - X.println("___ split of input: " + splitsCount.get()); - - return res; - } - } - - /** - * @throws Exception If failed. - */ - public void testTaskCancelling() throws Exception { - Configuration cfg = prepareJobForCancelling(); - - GridHadoopJobId jobId = new GridHadoopJobId(UUID.randomUUID(), 1); - - final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(cfg)); - - if (!GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return splitsCount.get() > 0; - } - }, 20000)) { - U.dumpThreads(log); - - assertTrue(false); - } - - if (!GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return executedTasks.get() == splitsCount.get(); - } - }, 20000)) { - U.dumpThreads(log); - - assertTrue(false); - } - - // Fail mapper with id "1", cancels others - failMapperId.set(1); - - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - fut.get(); - - return null; - } - }, IgniteCheckedException.class, null); - - assertEquals(executedTasks.get(), cancelledTasks.get() + 1); - } - - /** - * @throws Exception If failed. - */ - public void testJobKill() throws Exception { - Configuration cfg = prepareJobForCancelling(); - - GridHadoop hadoop = grid(0).hadoop(); - - GridHadoopJobId jobId = new GridHadoopJobId(UUID.randomUUID(), 1); - - //Kill unknown job. - boolean killRes = hadoop.kill(jobId); - - assertFalse(killRes); - - final IgniteInternalFuture<?> fut = hadoop.submit(jobId, createJobInfo(cfg)); - - if (!GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return splitsCount.get() > 0; - } - }, 20000)) { - U.dumpThreads(log); - - assertTrue(false); - } - - if (!GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - X.println("___ executed tasks: " + executedTasks.get()); - - return executedTasks.get() == splitsCount.get(); - } - }, 20000)) { - U.dumpThreads(log); - - fail(); - } - - //Kill really ran job. - killRes = hadoop.kill(jobId); - - assertTrue(killRes); - - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - fut.get(); - - return null; - } - }, IgniteCheckedException.class, null); - - assertEquals(executedTasks.get(), cancelledTasks.get()); - - //Kill the same job again. - killRes = hadoop.kill(jobId); - - assertTrue(killRes); - } - - private static class CancellingTestMapper extends Mapper<Object, Text, Text, IntWritable> { - private int mapperId; - - /** {@inheritDoc} */ - @Override protected void setup(Context ctx) throws IOException, InterruptedException { - mapperId = executedTasks.incrementAndGet(); - } - - /** {@inheritDoc} */ - @Override public void run(Context ctx) throws IOException, InterruptedException { - try { - super.run(ctx); - } - catch (GridHadoopTaskCancelledException e) { - cancelledTasks.incrementAndGet(); - - throw e; - } - } - - /** {@inheritDoc} */ - @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { - if (mapperId == failMapperId.get()) - throw new IOException(); - - Thread.sleep(1000); - } - } - - /** - * Test failing mapper. - */ - private static class FailMapper extends Mapper<Object, Text, Text, IntWritable> { - /** {@inheritDoc} */ - @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { - throw new IOException("Expected"); - } - } - - /** - * Mapper calculates number of lines. - */ - private static class TestMapper extends Mapper<Object, Text, Text, IntWritable> { - /** Writable integer constant of '1'. */ - private static final IntWritable ONE = new IntWritable(1); - - /** Line count constant. */ - public static final Text LINE_COUNT = new Text("lineCount"); - - /** {@inheritDoc} */ - @Override protected void setup(Context ctx) throws IOException, InterruptedException { - X.println("___ Mapper: " + ctx.getTaskAttemptID()); - - String taskId = ctx.getTaskAttemptID().toString(); - - LocalFileSystem locFs = FileSystem.getLocal(ctx.getConfiguration()); - - String workDir = locFs.getWorkingDirectory().toString(); - - assertNull(taskWorkDirs.put(workDir, taskId)); - } - - /** {@inheritDoc} */ - @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { - if (ctx.getConfiguration().getBoolean(MAP_WRITE, false)) - ctx.write(LINE_COUNT, ONE); - else - totalLineCnt.incrementAndGet(); - } - } - - /** - * Combiner calculates number of lines. - */ - private static class TestCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { - /** */ - IntWritable sum = new IntWritable(); - - /** {@inheritDoc} */ - @Override protected void setup(Context ctx) throws IOException, InterruptedException { - X.println("___ Combiner: "); - } - - /** {@inheritDoc} */ - @Override protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException, - InterruptedException { - int lineCnt = 0; - - for (IntWritable value : values) - lineCnt += value.get(); - - sum.set(lineCnt); - - X.println("___ combo: " + lineCnt); - - ctx.write(key, sum); - } - } - - /** - * Combiner calculates number of lines. - */ - private static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> { - /** */ - IntWritable sum = new IntWritable(); - - /** {@inheritDoc} */ - @Override protected void setup(Context ctx) throws IOException, InterruptedException { - X.println("___ Reducer: " + ctx.getTaskAttemptID()); - - String taskId = ctx.getTaskAttemptID().toString(); - String workDir = FileSystem.getLocal(ctx.getConfiguration()).getWorkingDirectory().toString(); - - assertNull(taskWorkDirs.put(workDir, taskId)); - } - - /** {@inheritDoc} */ - @Override protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException, - InterruptedException { - int lineCnt = 0; - - for (IntWritable value : values) { - lineCnt += value.get(); - - X.println("___ rdcr: " + value.get()); - } - - sum.set(lineCnt); - - ctx.write(key, sum); - - X.println("___ RDCR SUM: " + lineCnt); - - totalLineCnt.addAndGet(lineCnt); - } - } -}