ignite-98
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8fdf79da Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8fdf79da Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8fdf79da Branch: refs/heads/ignite-42 Commit: 8fdf79da491a44f30f69fa8d35068e4bdc0195d4 Parents: 725d79f Author: S.Vladykin <svlady...@gridgain.com> Authored: Fri Jan 16 14:43:47 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Fri Jan 16 14:43:47 2015 +0300 ---------------------------------------------------------------------- docs/core-site.gridgain.xml | 21 ++++++++++++++++++++ .../counter/GridHadoopFSCounterWriter.java | 4 ++-- .../counter/GridHadoopPerformanceCounter.java | 18 ++++++++++++++--- .../grid/hadoop/GridHadoopTestUtils.java | 20 +++++++++++++++++-- .../hadoop/GridHadoopCommandLineTest.java | 2 +- .../hadoop/GridHadoopMapReduceTest.java | 5 +++-- 6 files changed, 60 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fdf79da/docs/core-site.gridgain.xml ---------------------------------------------------------------------- diff --git a/docs/core-site.gridgain.xml b/docs/core-site.gridgain.xml index 56cc7b4..40d4ade 100644 --- a/docs/core-site.gridgain.xml +++ b/docs/core-site.gridgain.xml @@ -65,4 +65,25 @@ <name>dfs.client.block.write.replace-datanode-on-failure.policy</name> <value>NEVER</value> </property> + + <!-- + Allow to write the job statistics into GGFS. + --> + <!-- + <property> + <name>gridgain.counters.writer</name> + <value>org.gridgain.grid.kernal.processors.hadoop.counter.GridHadoopFSCounterWriter</value> + </property> + --> + + <!-- + By default data is placed into the file /user/<user_name>/<job_id>/performance + You can override this path with using macro ${USER} that is to injection of submitter user name. + --> + <!-- + <property> + <name>gridgain.counters.fswriter.directory</name> + <value>/user/${USER}</value> + </property> + --> </configuration> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fdf79da/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/counter/GridHadoopFSCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/counter/GridHadoopFSCounterWriter.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/counter/GridHadoopFSCounterWriter.java index 25a6857..c276f5f 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/counter/GridHadoopFSCounterWriter.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/counter/GridHadoopFSCounterWriter.java @@ -38,13 +38,13 @@ public class GridHadoopFSCounterWriter implements GridHadoopCounterWriter { private static final String DEFAULT_USER_NAME = "anonymous"; /** */ - private static final String COUNTER_WRITER_DIR_PROPERTY = "gridgain.counters.fswriter.directory"; + public static final String COUNTER_WRITER_DIR_PROPERTY = "gridgain.counters.fswriter.directory"; /** */ private static final String USER_MACRO = "${USER}"; /** */ - private static final String DEFAULT_COUNTER_WRITER_DIR = "/users/" + USER_MACRO; + private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + USER_MACRO; /** {@inheritDoc} */ @Override public void write(GridHadoopJobInfo jobInfo, GridHadoopJobId jobId, GridHadoopCounters cntrs) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fdf79da/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/counter/GridHadoopPerformanceCounter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/counter/GridHadoopPerformanceCounter.java b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/counter/GridHadoopPerformanceCounter.java index c503c42..d95cea3 100644 --- a/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/counter/GridHadoopPerformanceCounter.java +++ b/modules/hadoop/src/main/java/org/gridgain/grid/kernal/processors/hadoop/counter/GridHadoopPerformanceCounter.java @@ -118,9 +118,21 @@ public class GridHadoopPerformanceCounter extends GridHadoopCounterAdapter { * @return String contains necessary event information. */ private String eventName(GridHadoopTaskInfo info, String evtType) { + return eventName(info.type().toString(), info.taskNumber(), evtType); + } + + /** + * Generate name that consists of some event information. + * + * @param taskType Task type. + * @param taskNum Number of the task. + * @param evtType The type of the event. + * @return String contains necessary event information. + */ + private String eventName(String taskType, int taskNum, String evtType) { assert nodeId != null; - return info.type() + " " + info.taskNumber() + " " + evtType + " " + nodeId; + return taskType + " " + taskNum + " " + evtType + " " + nodeId; } /** @@ -151,8 +163,8 @@ public class GridHadoopPerformanceCounter extends GridHadoopCounterAdapter { */ public void onTaskFinish(GridHadoopTaskInfo info, long ts) { if (info.type() == GridHadoopTaskType.REDUCE && lastShuffleMsg != null) { - evts.add(new T2<>("SHUFFLE " + reducerNum + " start", firstShuffleMsg)); - evts.add(new T2<>("SHUFFLE " + reducerNum + " finish", lastShuffleMsg)); + evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "start"), firstShuffleMsg)); + evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "finish"), lastShuffleMsg)); lastShuffleMsg = null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fdf79da/modules/hadoop/src/test/java/org/gridgain/grid/hadoop/GridHadoopTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/gridgain/grid/hadoop/GridHadoopTestUtils.java b/modules/hadoop/src/test/java/org/gridgain/grid/hadoop/GridHadoopTestUtils.java index 1feb848..0a5a5e0 100644 --- a/modules/hadoop/src/test/java/org/gridgain/grid/hadoop/GridHadoopTestUtils.java +++ b/modules/hadoop/src/test/java/org/gridgain/grid/hadoop/GridHadoopTestUtils.java @@ -17,6 +17,8 @@ package org.gridgain.grid.hadoop; +import org.gridgain.grid.util.typedef.*; + import java.io.*; import java.util.*; @@ -56,6 +58,8 @@ public class GridHadoopTestUtils { long evtCnt = 0; String line; + Map<Long, String> reduceNodes = new HashMap<>(); + while((line = reader.readLine()) != null) { String[] splitLine = line.split(":"); @@ -71,8 +75,20 @@ public class GridHadoopTestUtils { if ("JOB".equals(evt[0])) phase = evt[1]; else { - //Try parse task number - Long.parseLong(evt[1]); + assertEquals(4, evt.length); + assertTrue("The node id is not defined", !F.isEmpty(evt[3])); + + long taskNum = Long.parseLong(evt[1]); + + if (("REDUCE".equals(evt[0]) || "SHUFFLE".equals(evt[0]))) { + String nodeId = reduceNodes.get(taskNum); + + if (nodeId == null) + reduceNodes.put(taskNum, evt[3]); + else + assertEquals("Different nodes for SHUFFLE and REDUCE tasks", nodeId, evt[3]); + } + phase = evt[2]; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fdf79da/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopCommandLineTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopCommandLineTest.java b/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopCommandLineTest.java index 38279a9..0f66071 100644 --- a/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopCommandLineTest.java +++ b/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopCommandLineTest.java @@ -312,7 +312,7 @@ public class GridHadoopCommandLineTest extends GridCommonAbstractTest { assertEquals(0, executeHadoopCmd("jar", examplesJar.getAbsolutePath(), "wordcount", "/input", "/output")); - IgniteFsPath path = new IgniteFsPath("/users/" + System.getProperty("user.name") + "/"); + IgniteFsPath path = new IgniteFsPath("/user/" + System.getProperty("user.name") + "/"); assertTrue(ggfs.exists(path)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8fdf79da/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopMapReduceTest.java index ca14f4c..75620f7 100644 --- a/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopMapReduceTest.java +++ b/modules/hadoop/src/test/java/org/gridgain/grid/kernal/processors/hadoop/GridHadoopMapReduceTest.java @@ -26,7 +26,6 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.ignite.*; import org.apache.ignite.fs.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.hadoop.*; import org.gridgain.grid.kernal.processors.hadoop.counter.*; import org.gridgain.grid.kernal.processors.hadoop.examples.*; @@ -71,6 +70,8 @@ public class GridHadoopMapReduceTest extends GridHadoopAbstractWordCountTest { JobConf jobConf = new JobConf(); jobConf.set(JOB_COUNTER_WRITER_PROPERTY, GridHadoopFSCounterWriter.class.getName()); + jobConf.setUser("yyy"); + jobConf.set(GridHadoopFSCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz"); //To split into about 40 items for v2 jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000); @@ -180,7 +181,7 @@ public class GridHadoopMapReduceTest extends GridHadoopAbstractWordCountTest { } } - final IgniteFsPath statPath = new IgniteFsPath("/users/anonymous/" + jobId + "/performance"); + final IgniteFsPath statPath = new IgniteFsPath("/xxx/yyy/zzz/" + jobId + "/performance"); GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() {