Liron Aravot has uploaded a new change for review.

Change subject: core: add executor for execution of runnables with 
CountDownLatch
......................................................................

core: add executor for execution of runnables with CountDownLatch

This patch adds LatchedRunnableExecuter to be able to execute
LatchedRunnableWrapper (pre existing class) instances using
CountDownLatch.

1. LatchedRunnableExecuter basically could have been implemented as
class whose instances cannot be created and with static methods only -
it's has been
implemented like this in order to provide proper testing (creating
instances of it is rare, so it shouldn't matter in terms of
garbage collection), a singleton isn't used as well as it's being used
only from Reconstruct flow which is rare,
so I prefer to create instance when needed instead of having permanent
instance of it.

2. Handle of rejected submission of runnables was removed last
patch because its abnormal situation that we shouldn't reach at all in
that case according to infra so there's no need to submit runnable again
if it failed.

3. Handle of InterruptedException : right now it's caught only as it's
being taken care of in other places in the code.

Change-Id: Ia56ff4afac01803ac0753dbf88fcfbfd34e2e56d
Signed-off-by: Liron Aravot <lara...@redhat.com>
---
A 
backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/thread/LatchedRunnableExecutor.java
M 
backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapperTest.java
2 files changed, 167 insertions(+), 26 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/ovirt-engine refs/changes/54/10354/1

diff --git 
a/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/thread/LatchedRunnableExecutor.java
 
b/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/thread/LatchedRunnableExecutor.java
new file mode 100644
index 0000000..2cd4cac
--- /dev/null
+++ 
b/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/thread/LatchedRunnableExecutor.java
@@ -0,0 +1,47 @@
+package org.ovirt.engine.core.utils.thread;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.ovirt.engine.core.utils.threadpool.ThreadPoolUtil;
+
+public class LatchedRunnableExecutor {
+
+    private List<Runnable> runnables;
+
+    /**
+     * @param runnables
+     *            - list of runnables for execution
+     */
+    public LatchedRunnableExecutor(List<Runnable> runnables) {
+        this.runnables = runnables;
+    }
+
+    protected void executeRunnable(LatchedRunnableWrapper runnable) {
+        ThreadPoolUtil.execute(runnable);
+    }
+
+    protected LatchedRunnableWrapper createLatchedRunnableWrapper(Runnable 
runnable, CountDownLatch latch) {
+        return new LatchedRunnableWrapper(runnable, latch);
+    }
+
+    protected CountDownLatch createCountDownLatch() {
+        return new CountDownLatch(runnables.size());
+    }
+
+    /**
+     * executes the list of Runnable provided to this executer during 
creations and waits till the execution of all
+     * runnables is done.
+     */
+    public void execute() {
+        CountDownLatch latch = createCountDownLatch();
+        for (Runnable runnable : runnables) {
+            LatchedRunnableWrapper latchWrapper = 
createLatchedRunnableWrapper(runnable, latch);
+            executeRunnable(latchWrapper);
+        }
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+        }
+    }
+}
diff --git 
a/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapperTest.java
 
b/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapperTest.java
index fafc679..8c633fe 100644
--- 
a/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapperTest.java
+++ 
b/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapperTest.java
@@ -3,30 +3,46 @@
  */
 package org.ovirt.engine.core.utils.thread;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
-import static org.junit.Assert.assertEquals;
 
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.exceptions.base.MockitoException;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class LatchedRunnableWrapperTest {
+    private static final int THREADS_NUMBER = 20;
 
     private AtomicInteger counter;
+    private LatchedRunnableExecutor latchedRunnableExecuter;
+    private CountDownLatch latch;
+    private ExecutorService threadPool;
+    private RunnableCreator runnableCreator;
 
     private interface RunnableCreator {
         Runnable createRunnable();
     }
 
-
     private class DummyRunnable implements Runnable {
-
-
-        public DummyRunnable() {
-        }
-
         @Override
         public void run() {
             counter.incrementAndGet();
@@ -36,34 +52,112 @@
     @Before
     public void setup() {
         counter = new AtomicInteger();
-    }
-
-    /**
-     *
-     */
-    @Test
-    public void latchedRunnableWrapperTest() {
-        final int threadsNumber = 100;
-        runThreads(threadsNumber, new RunnableCreator() {
+        threadPool = Executors.newFixedThreadPool(THREADS_NUMBER);
+        runnableCreator = new RunnableCreator() {
 
             @Override
             public Runnable createRunnable() {
                 return new DummyRunnable();
             }
-        });
-        assertEquals(threadsNumber, counter.intValue());
+        };
     }
 
-    private void runThreads(final int threadsNumber, RunnableCreator 
runnableCreator) {
-        ExecutorService fixedThreadPool = 
Executors.newFixedThreadPool(threadsNumber);
-        CountDownLatch latch = new CountDownLatch(threadsNumber);
-        for (int index = 0; index < threadsNumber; index++) {
-            fixedThreadPool.execute(new 
LatchedRunnableWrapper(runnableCreator.createRunnable(), latch));
-        }
+    @Test
+    public void regularExecution() {
+        prepareMocks(THREADS_NUMBER);
+        latchedRunnableExecuter.execute();
+        assertEquals("the counter wasn't incremented the expected number of 
times", THREADS_NUMBER, counter.intValue());
+        verifyCommonExecutionChecks();
+    }
+
+    @Test
+    public void submitFullFailure() {
+        boolean gotException = false;
+        prepareMocks(0);
         try {
-            latch.await();
+            latchedRunnableExecuter.execute();
+        } catch (RejectedExecutionException e) {
+            gotException = true;
+        }
+        assertTrue("expected RejectedExecutionException wasn't thrown", 
gotException);
+        assertEquals("the counter was incremented more times then expected", 
0, counter.intValue());
+        assertEquals("latch counter wasn't in the expected value", 
THREADS_NUMBER, latch.getCount());
+        verifyCommonFailureChecks();
+    }
+
+    @Test
+    public void submitPartialFailure() {
+        int expectedToRun = THREADS_NUMBER - 5;
+        prepareMocks(expectedToRun);
+        boolean gotException = false;
+        try {
+            latchedRunnableExecuter.execute();
+        } catch (RejectedExecutionException e) {
+            gotException = true;
+        }
+        assertTrue("expected RejectedExecutionException wasn't thrown", 
gotException);
+        assertFalse("the counter wasn't incremented the expected number of 
times", expectedToRun < counter.intValue());
+        assertTrue("latch counter value was lower than expected", 
latch.getCount() > 0);
+        assertTrue("latch counter value was greater than expected", 
latch.getCount() < THREADS_NUMBER);
+        verifyCommonFailureChecks();
+    }
+
+    /**
+     * @param runnableCreator
+     * @param isSubmitRetry
+     * @param isExecuteOnFirstRun
+     */
+    private void prepareMocks(final int countToExecute) {
+        List<Runnable> runnables = new LinkedList<Runnable>();
+        for (int index = 0; index < THREADS_NUMBER; index++) {
+            runnables.add(runnableCreator.createRunnable());
+        }
+
+        latchedRunnableExecuter = spy(new LatchedRunnableExecutor(runnables));
+        latch = spy(latchedRunnableExecuter.createCountDownLatch());
+
+        doReturn(latch).when(latchedRunnableExecuter).createCountDownLatch();
+
+        final HashSet<Runnable> executedRunnables = new HashSet<Runnable>();
+
+        doAnswer(new Answer<LatchedRunnableWrapper>() {
+            @Override
+            public LatchedRunnableWrapper answer(InvocationOnMock invocation) 
throws Throwable {
+                final LatchedRunnableWrapper toReturn =
+                        new LatchedRunnableWrapper((Runnable) 
invocation.getArguments()[0],
+                                (CountDownLatch) invocation.getArguments()[1]);
+                doAnswer(new Answer<Void>() {
+                    @Override
+                    public Void answer(InvocationOnMock invocation) throws 
Throwable {
+                        if (executedRunnables.size() < countToExecute) {
+                            threadPool.execute(toReturn);
+                            executedRunnables.add(toReturn);
+                        } else {
+                            throw new RejectedExecutionException();
+                        }
+                        return null;
+                    }
+                }).when(latchedRunnableExecuter).executeRunnable(toReturn);
+                return toReturn;
+            }
+        
}).when(latchedRunnableExecuter).createLatchedRunnableWrapper(any(Runnable.class),
 any(CountDownLatch.class));
+    }
+
+    private void verifyCommonExecutionChecks() {
+        verify(latch, times(THREADS_NUMBER)).countDown();
+        assertEquals("latch counter value wasn't in the expected value", 0, 
latch.getCount());
+        try {
+            verify(latch, times(1)).await();
         } catch (InterruptedException e) {
-            e.printStackTrace();
+            throw new MockitoException(e.toString());
+        }
+    }
+
+    private void verifyCommonFailureChecks() {
+        try {
+            verify(latch, never()).await();
+        } catch (InterruptedException e) {
+            throw new MockitoException(e.toString());
         }
     }
 }


--
To view, visit http://gerrit.ovirt.org/10354
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia56ff4afac01803ac0753dbf88fcfbfd34e2e56d
Gerrit-PatchSet: 1
Gerrit-Project: ovirt-engine
Gerrit-Branch: master
Gerrit-Owner: Liron Aravot <lara...@redhat.com>
_______________________________________________
Engine-patches mailing list
Engine-patches@ovirt.org
http://lists.ovirt.org/mailman/listinfo/engine-patches

Reply via email to