Liron Aravot has uploaded a new change for review. Change subject: core: add executor for execution of runnables with CountDownLatch ......................................................................
core: add executor for execution of runnables with CountDownLatch This patch adds LatchedRunnableExecuter to be able to execute LatchedRunnableWrapper (pre existing class) instances using CountDownLatch. 1. LatchedRunnableExecuter basically could have been implemented as class whose instances cannot be created and with static methods only - it's has been implemented like this in order to provide proper testing (creating instances of it is rare, so it shouldn't matter in terms of garbage collection), a singleton isn't used as well as it's being used only from Reconstruct flow which is rare, so I prefer to create instance when needed instead of having permanent instance of it. 2. Handle of rejected submission of runnables was removed last patch because its abnormal situation that we shouldn't reach at all in that case according to infra so there's no need to submit runnable again if it failed. 3. Handle of InterruptedException : right now it's caught only as it's being taken care of in other places in the code. Change-Id: Ia56ff4afac01803ac0753dbf88fcfbfd34e2e56d Signed-off-by: Liron Aravot <lara...@redhat.com> --- A backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/thread/LatchedRunnableExecutor.java M backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapperTest.java 2 files changed, 167 insertions(+), 26 deletions(-) git pull ssh://gerrit.ovirt.org:29418/ovirt-engine refs/changes/54/10354/1 diff --git a/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/thread/LatchedRunnableExecutor.java b/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/thread/LatchedRunnableExecutor.java new file mode 100644 index 0000000..2cd4cac --- /dev/null +++ b/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/thread/LatchedRunnableExecutor.java @@ -0,0 +1,47 @@ +package org.ovirt.engine.core.utils.thread; + +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import org.ovirt.engine.core.utils.threadpool.ThreadPoolUtil; + +public class LatchedRunnableExecutor { + + private List<Runnable> runnables; + + /** + * @param runnables + * - list of runnables for execution + */ + public LatchedRunnableExecutor(List<Runnable> runnables) { + this.runnables = runnables; + } + + protected void executeRunnable(LatchedRunnableWrapper runnable) { + ThreadPoolUtil.execute(runnable); + } + + protected LatchedRunnableWrapper createLatchedRunnableWrapper(Runnable runnable, CountDownLatch latch) { + return new LatchedRunnableWrapper(runnable, latch); + } + + protected CountDownLatch createCountDownLatch() { + return new CountDownLatch(runnables.size()); + } + + /** + * executes the list of Runnable provided to this executer during creations and waits till the execution of all + * runnables is done. + */ + public void execute() { + CountDownLatch latch = createCountDownLatch(); + for (Runnable runnable : runnables) { + LatchedRunnableWrapper latchWrapper = createLatchedRunnableWrapper(runnable, latch); + executeRunnable(latchWrapper); + } + try { + latch.await(); + } catch (InterruptedException e) { + } + } +} diff --git a/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapperTest.java b/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapperTest.java index fafc679..8c633fe 100644 --- a/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapperTest.java +++ b/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapperTest.java @@ -3,30 +3,46 @@ */ package org.ovirt.engine.core.utils.thread; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicInteger; -import static org.junit.Assert.assertEquals; import org.junit.Before; import org.junit.Test; +import org.mockito.exceptions.base.MockitoException; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class LatchedRunnableWrapperTest { + private static final int THREADS_NUMBER = 20; private AtomicInteger counter; + private LatchedRunnableExecutor latchedRunnableExecuter; + private CountDownLatch latch; + private ExecutorService threadPool; + private RunnableCreator runnableCreator; private interface RunnableCreator { Runnable createRunnable(); } - private class DummyRunnable implements Runnable { - - - public DummyRunnable() { - } - @Override public void run() { counter.incrementAndGet(); @@ -36,34 +52,112 @@ @Before public void setup() { counter = new AtomicInteger(); - } - - /** - * - */ - @Test - public void latchedRunnableWrapperTest() { - final int threadsNumber = 100; - runThreads(threadsNumber, new RunnableCreator() { + threadPool = Executors.newFixedThreadPool(THREADS_NUMBER); + runnableCreator = new RunnableCreator() { @Override public Runnable createRunnable() { return new DummyRunnable(); } - }); - assertEquals(threadsNumber, counter.intValue()); + }; } - private void runThreads(final int threadsNumber, RunnableCreator runnableCreator) { - ExecutorService fixedThreadPool = Executors.newFixedThreadPool(threadsNumber); - CountDownLatch latch = new CountDownLatch(threadsNumber); - for (int index = 0; index < threadsNumber; index++) { - fixedThreadPool.execute(new LatchedRunnableWrapper(runnableCreator.createRunnable(), latch)); - } + @Test + public void regularExecution() { + prepareMocks(THREADS_NUMBER); + latchedRunnableExecuter.execute(); + assertEquals("the counter wasn't incremented the expected number of times", THREADS_NUMBER, counter.intValue()); + verifyCommonExecutionChecks(); + } + + @Test + public void submitFullFailure() { + boolean gotException = false; + prepareMocks(0); try { - latch.await(); + latchedRunnableExecuter.execute(); + } catch (RejectedExecutionException e) { + gotException = true; + } + assertTrue("expected RejectedExecutionException wasn't thrown", gotException); + assertEquals("the counter was incremented more times then expected", 0, counter.intValue()); + assertEquals("latch counter wasn't in the expected value", THREADS_NUMBER, latch.getCount()); + verifyCommonFailureChecks(); + } + + @Test + public void submitPartialFailure() { + int expectedToRun = THREADS_NUMBER - 5; + prepareMocks(expectedToRun); + boolean gotException = false; + try { + latchedRunnableExecuter.execute(); + } catch (RejectedExecutionException e) { + gotException = true; + } + assertTrue("expected RejectedExecutionException wasn't thrown", gotException); + assertFalse("the counter wasn't incremented the expected number of times", expectedToRun < counter.intValue()); + assertTrue("latch counter value was lower than expected", latch.getCount() > 0); + assertTrue("latch counter value was greater than expected", latch.getCount() < THREADS_NUMBER); + verifyCommonFailureChecks(); + } + + /** + * @param runnableCreator + * @param isSubmitRetry + * @param isExecuteOnFirstRun + */ + private void prepareMocks(final int countToExecute) { + List<Runnable> runnables = new LinkedList<Runnable>(); + for (int index = 0; index < THREADS_NUMBER; index++) { + runnables.add(runnableCreator.createRunnable()); + } + + latchedRunnableExecuter = spy(new LatchedRunnableExecutor(runnables)); + latch = spy(latchedRunnableExecuter.createCountDownLatch()); + + doReturn(latch).when(latchedRunnableExecuter).createCountDownLatch(); + + final HashSet<Runnable> executedRunnables = new HashSet<Runnable>(); + + doAnswer(new Answer<LatchedRunnableWrapper>() { + @Override + public LatchedRunnableWrapper answer(InvocationOnMock invocation) throws Throwable { + final LatchedRunnableWrapper toReturn = + new LatchedRunnableWrapper((Runnable) invocation.getArguments()[0], + (CountDownLatch) invocation.getArguments()[1]); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + if (executedRunnables.size() < countToExecute) { + threadPool.execute(toReturn); + executedRunnables.add(toReturn); + } else { + throw new RejectedExecutionException(); + } + return null; + } + }).when(latchedRunnableExecuter).executeRunnable(toReturn); + return toReturn; + } + }).when(latchedRunnableExecuter).createLatchedRunnableWrapper(any(Runnable.class), any(CountDownLatch.class)); + } + + private void verifyCommonExecutionChecks() { + verify(latch, times(THREADS_NUMBER)).countDown(); + assertEquals("latch counter value wasn't in the expected value", 0, latch.getCount()); + try { + verify(latch, times(1)).await(); } catch (InterruptedException e) { - e.printStackTrace(); + throw new MockitoException(e.toString()); + } + } + + private void verifyCommonFailureChecks() { + try { + verify(latch, never()).await(); + } catch (InterruptedException e) { + throw new MockitoException(e.toString()); } } } -- To view, visit http://gerrit.ovirt.org/10354 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ia56ff4afac01803ac0753dbf88fcfbfd34e2e56d Gerrit-PatchSet: 1 Gerrit-Project: ovirt-engine Gerrit-Branch: master Gerrit-Owner: Liron Aravot <lara...@redhat.com> _______________________________________________ Engine-patches mailing list Engine-patches@ovirt.org http://lists.ovirt.org/mailman/listinfo/engine-patches