Author: nuttycom Date: Wed Sep 22 21:57:09 2010 New Revision: 1000252 URL: http://svn.apache.org/viewvc?rev=1000252&view=rev Log: minor formatting improvements, changed instances of System.out.println to log.debug
Added: commons/sandbox/pipeline/trunk/.gitignore Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategy.java commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategy.java commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategyTest.java commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java Added: commons/sandbox/pipeline/trunk/.gitignore URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/.gitignore?rev=1000252&view=auto ============================================================================== --- commons/sandbox/pipeline/trunk/.gitignore (added) +++ commons/sandbox/pipeline/trunk/.gitignore Wed Sep 22 21:57:09 2010 @@ -0,0 +1,2 @@ +nbproject +target Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategy.java URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategy.java?rev=1000252&r1=1000251&r2=1000252&view=diff ============================================================================== --- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategy.java (original) +++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/EqualizingDriverControlStrategy.java Wed Sep 22 21:57:09 2010 @@ -14,43 +14,48 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.commons.pipeline.driver.control; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.commons.pipeline.Stage; /** * An implementation of DriverControlStrategy that measures stage execution times * and increases thread counts that are taking longer than other stages on average - * - * @author mirror */ public class EqualizingDriverControlStrategy implements DriverControlStrategy { - + + Log log = LogFactory.getLog(EqualizingDriverControlStrategy.class); + private static class Tuple { + private int count = 0; private long duration = 0; - - Tuple() { } - + + Tuple() { + } + public void add(long duration) { count++; this.duration += duration; } } - + /** Creates a new instance of EqualizingDriverControlStrategy */ public EqualizingDriverControlStrategy() { } - + public void handleEvents(List<PrioritizableStageDriver> drivers, List<StageProcessTimingEvent> events) { - if (events.isEmpty()) return; - - Map<Stage, Tuple> timings = new HashMap<Stage,Tuple>(); + if (events.isEmpty()) { + return; + } + + Map<Stage, Tuple> timings = new HashMap<Stage, Tuple>(); long total = 0; for (StageProcessTimingEvent ev : events) { Tuple tuple = timings.get((Stage) ev.getSource()); @@ -58,37 +63,37 @@ public class EqualizingDriverControlStra tuple = new Tuple(); timings.put((Stage) ev.getSource(), tuple); } - + tuple.add(ev.getLatency()); total += ev.getLatency(); } - - //System.out.println("Events handled: " + events.size()); - System.out.print("Stage latencies: "); - for (Map.Entry<Stage,Tuple> entry : timings.entrySet()) { - System.out.print(entry.getKey() + ": " + entry.getValue().duration / entry.getValue().count + "; "); - } - System.out.println(); - //System.out.println("Total latency: " + total); - + + if (log.isDebugEnabled()) { + log.debug("Events handled: " + events.size()); + log.debug("Stage latencies: "); + for (Map.Entry<Stage, Tuple> entry : timings.entrySet()) { + log.debug(entry.getKey() + ": " + entry.getValue().duration / entry.getValue().count + "; "); + } + log.debug("Total latency: " + total); + } + double mean = total / events.size(); - //System.out.println("Mean latency: " + mean); - + //log.debug("Mean latency: " + mean); + for (PrioritizableStageDriver driver : drivers) { Tuple tuple = timings.get(driver.getStage()); if (tuple != null) { - long averageDuration = tuple.duration / tuple.count; - if (averageDuration > mean + allowableDelta) { - System.out.println("Increasing priority for stage " + driver.getStage() + " with average duration " + averageDuration); - driver.increasePriority(1); - } else if (averageDuration < mean - allowableDelta) { - driver.decreasePriority(1); - System.out.println("Decreasing priority for stage " + driver.getStage() + " with average duration " + averageDuration); + long averageDuration = tuple.duration / tuple.count; + if (averageDuration > mean + allowableDelta) { + log.debug("Increasing priority for stage " + driver.getStage() + " with average duration " + averageDuration); + driver.increasePriority(1); + } else if (averageDuration < mean - allowableDelta) { + driver.decreasePriority(1); + log.debug("Decreasing priority for stage " + driver.getStage() + " with average duration " + averageDuration); } } } } - /** * Holds value of property allowableDelta. */ @@ -109,5 +114,4 @@ public class EqualizingDriverControlStra public void setAllowableDelta(long allowableDelta) { this.allowableDelta = allowableDelta; } - } Modified: commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategy.java URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategy.java?rev=1000252&r1=1000251&r2=1000252&view=diff ============================================================================== --- commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategy.java (original) +++ commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/driver/control/ExperimentalDriverControlStrategy.java Wed Sep 22 21:57:09 2010 @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.commons.pipeline.driver.control; import java.util.HashMap; @@ -22,6 +21,8 @@ import java.util.List; import java.util.Map; import java.util.Random; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.commons.pipeline.Stage; /** @@ -29,10 +30,11 @@ import org.apache.commons.pipeline.Stage * increases and decreases priorities to see if performance is improved. If * a performance improvement is found, additional experiments are done in the * same direction - * - * @author braeckel */ public class ExperimentalDriverControlStrategy implements DriverControlStrategy { + + private Log log = LogFactory.getLog(ExperimentalDriverControlStrategy.class); + /** * The minimum time difference (in percent) between different analyses of a stage for * modifications to take place. In other words, if a stage is stable with @@ -40,44 +42,57 @@ public class ExperimentalDriverControlSt */ private int minDifferencePercent = 3; - private enum Action { - Decrease { void execute( PrioritizableStageDriver driver ){driver.decreasePriority( 1 ); } }, - Increase { void execute( PrioritizableStageDriver driver ){driver.increasePriority( 1 ); } }, - None { void execute( PrioritizableStageDriver driver ){ /*do nothing*/ } }; + + Decrease { + + void execute(PrioritizableStageDriver driver) { + driver.decreasePriority(1); + } + }, + Increase { + + void execute(PrioritizableStageDriver driver) { + driver.increasePriority(1); + } + }, + None { + + void execute(PrioritizableStageDriver driver) { /*do nothing*/ } + }; abstract void execute(PrioritizableStageDriver driver); } private class Tuple { + private int count = 0; private long duration = 0; private Action lastAction = Action.None; - Tuple() { } + Tuple() { + } public void add(long duration) { count++; this.duration += duration; } } - - private Map<Stage, Tuple> lastTimings = new HashMap<Stage,Tuple>(); + private Map<Stage, Tuple> lastTimings = new HashMap<Stage, Tuple>(); /** Creates a new instance of EqualizingDriverControlStrategy */ public ExperimentalDriverControlStrategy() { } - public ExperimentalDriverControlStrategy( int minDifferencePercent ){ - if( minDifferencePercent < 0 || minDifferencePercent > 100 ) - { - throw new IllegalArgumentException( "Minimum difference percent must be between 0 and 100" ); + public ExperimentalDriverControlStrategy(int minDifferencePercent) { + if (minDifferencePercent < 0 || minDifferencePercent > 100) { + throw new IllegalArgumentException("Minimum difference percent must be between 0 and 100"); } this.minDifferencePercent = minDifferencePercent; } public void handleEvents(List<PrioritizableStageDriver> drivers, List<StageProcessTimingEvent> events) { - Map<Stage, Tuple> timings = new HashMap<Stage,Tuple>(); + Map<Stage, Tuple> timings = new HashMap<Stage, Tuple>(); for (StageProcessTimingEvent ev : events) { Tuple tuple = timings.get(ev.getSource()); if (tuple == null) { @@ -90,92 +105,86 @@ public class ExperimentalDriverControlSt for (PrioritizableStageDriver driver : drivers) { Tuple mostRecentTiming = timings.get(driver.getStage()); - Tuple previousTiming = lastTimings.get( driver.getStage() ); + Tuple previousTiming = lastTimings.get(driver.getStage()); double avgMostRecentDuration = mostRecentTiming.duration / mostRecentTiming.count; //first time around, try increasing priority - if( previousTiming == null ) - { + if (previousTiming == null) { mostRecentTiming.lastAction = Action.Increase; - driver.increasePriority( 1 ); + driver.increasePriority(1); } - if( previousTiming != null ){ + if (previousTiming != null) { double avgPreviousTiming = previousTiming.duration / previousTiming.count; //if the performance has decreased significantly... double timingDifference = avgPreviousTiming - avgMostRecentDuration; - System.out.println( "Performance went from "+avgPreviousTiming + " to "+avgMostRecentDuration +"("+timingDifference+")"); + log.debug("Performance went from " + avgPreviousTiming + " to " + avgMostRecentDuration + "(" + timingDifference + ")"); //if the timing difference was significant enough to work with... double minDifference = avgPreviousTiming * (minDifferencePercent / 100.0); - if( Math.abs( timingDifference ) >= minDifference ) - { + if (Math.abs(timingDifference) >= minDifference) { //if the diff is positive, we have a performance improvement - if( timingDifference > 0 ) - { + if (timingDifference > 0) { //continue whatever we did last time to try and get further //improvement - if( previousTiming.lastAction == Action.Increase ){ - driver.increasePriority( 1 ); + if (previousTiming.lastAction == Action.Increase) { + driver.increasePriority(1); mostRecentTiming.lastAction = Action.Increase; - } - else if( previousTiming.lastAction == Action.Decrease ){ - driver.decreasePriority( 1 ); + } else if (previousTiming.lastAction == Action.Decrease) { + driver.decreasePriority(1); mostRecentTiming.lastAction = Action.Decrease; - } - //there was no last action. Try a random action - else{ - System.out.println( "Significant performance change without a previous action: RANDOM action"); + } //there was no last action. Try a random action + else { + log.debug("Significant performance change without a previous action: RANDOM action"); Action randomAction = getRandomAction(); mostRecentTiming.lastAction = randomAction; - randomAction.execute( driver ); + randomAction.execute(driver); } - } - //there was a performance degradation, reverse our last step - else - { + } //there was a performance degradation, reverse our last step + else { //reverse whatever we did last time to try and get further //improvement - if( previousTiming.lastAction == Action.Increase ){ - driver.decreasePriority( 1 ); + if (previousTiming.lastAction == Action.Increase) { + driver.decreasePriority(1); mostRecentTiming.lastAction = Action.Decrease; - } - else if( previousTiming.lastAction == Action.Decrease ){ - driver.increasePriority( 1 ); + } else if (previousTiming.lastAction == Action.Decrease) { + driver.increasePriority(1); mostRecentTiming.lastAction = Action.Increase; - } - //there was no last action. Try a random action - else{ - System.out.println( "Significant performance change without a previous action: RANDOM action"); + } //there was no last action. Try a random action + else { + log.debug("Significant performance change without a previous action: RANDOM action"); Action randomAction = getRandomAction(); mostRecentTiming.lastAction = randomAction; - randomAction.execute( driver ); + randomAction.execute(driver); } } - } - else{ + } else { mostRecentTiming.lastAction = Action.None; } } - System.out.println( "Action="+mostRecentTiming.lastAction+", current priority="+driver.getPriority() ); + log.debug("Action=" + mostRecentTiming.lastAction + ", current priority=" + driver.getPriority()); //take our most recent timings and roll them into the previous timings - lastTimings.put( driver.getStage(), mostRecentTiming ); + lastTimings.put(driver.getStage(), mostRecentTiming); } } - private Action getRandomAction() - { + private Action getRandomAction() { int val = new Random().nextInt(); - if( val < 0 ) val *= -1; + if (val < 0) { + val *= -1; + } int actionVal = val % 3; - switch( actionVal ){ - case 0: return Action.None; - case 1: return Action.Increase; - case 2: return Action.Decrease; - default: throw new IllegalStateException(); + switch (actionVal) { + case 0: + return Action.None; + case 1: + return Action.Increase; + case 2: + return Action.Decrease; + default: + throw new IllegalStateException(); } } - /** * Holds value of property allowableDelta. */ @@ -196,5 +205,4 @@ public class ExperimentalDriverControlSt public void setAllowableDelta(long allowableDelta) { this.allowableDelta = allowableDelta; } - } Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategyTest.java URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategyTest.java?rev=1000252&r1=1000251&r2=1000252&view=diff ============================================================================== --- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategyTest.java (original) +++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/driver/control/WallClockThresholdDriverControlStrategyTest.java Wed Sep 22 21:57:09 2010 @@ -20,20 +20,6 @@ package org.apache.commons.pipeline.driv import junit.framework.*; import static org.apache.commons.pipeline.StageDriver.State.*; import static org.apache.commons.pipeline.driver.FaultTolerance.*; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadPoolExecutor; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.commons.pipeline.Feeder; -import org.apache.commons.pipeline.Stage; -import org.apache.commons.pipeline.StageContext; -import org.apache.commons.pipeline.StageDriver; -import org.apache.commons.pipeline.StageException; -import org.apache.commons.pipeline.driver.FaultTolerance; public class WallClockThresholdDriverControlStrategyTest extends TestCase { @@ -55,7 +41,7 @@ public class WallClockThresholdDriverCon public void testCPUBoundControl() throws Exception { - System.out.println( "WallClock: testCPUBoundControl"); + //System.out.println( "WallClock: testCPUBoundControl"); CountingDriverController controller = new CountingDriverController(); controller.setMinimumEventsToHandle( 10 ); controller.setDriverControlStrategy( new WallClockThresholdDriverControlStrategy() ); @@ -67,7 +53,7 @@ public class WallClockThresholdDriverCon public void testIOBoundControl() throws Exception { - System.out.println( "WallClock: testIOBoundControl"); + //System.out.println( "WallClock: testIOBoundControl"); CountingDriverController controller = new CountingDriverController(); controller.setMinimumEventsToHandle( 10 ); controller.setDriverControlStrategy( new WallClockThresholdDriverControlStrategy() ); Modified: commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java URL: http://svn.apache.org/viewvc/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java?rev=1000252&r1=1000251&r2=1000252&view=diff ============================================================================== --- commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java (original) +++ commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java Wed Sep 22 21:57:09 2010 @@ -49,7 +49,7 @@ public class KeyWaitBufferStageTest exte * data waiting for notify() to be called with an appropriate event. */ public void testProcessAndNotify() throws Exception { - System.out.println("notify"); + //System.out.println("processAndNotify"); String obj = "Hello, World!"; KeyFactory<Object,Integer> keyFactory = new KeyFactory.HashKeyFactory();