Liron Aravot has uploaded a new change for review.

Change subject: core: ReconstructMasterDomain -connect/disconnect hosts 
simultaneously
......................................................................

core: ReconstructMasterDomain -connect/disconnect hosts simultaneously

When running ReconstructMasterDomain/RecoveryStoragePool commands - any
connect/disconnect operations can be done simultaneously by different
threads in order to improve the runtime.

This patch adds LatchedRunnableExecuter to execute
LatchedRunnableWrapper instances using CountDownLatch and makes use of
it in ReconstructMasterDomainCommand as well as test class.

Change-Id: I3c8dac368d60eac85a92762dffc5dbb8062b85ef
Signed-off-by: Liron Aravot <lara...@redhat.com>
---
M 
backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/storage/ReconstructMasterDomainCommand.java
A 
backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/thread/LatchedRunnableExecuter.java
M 
backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapperTest.java
3 files changed, 277 insertions(+), 64 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/ovirt-engine refs/changes/17/10117/1

diff --git 
a/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/storage/ReconstructMasterDomainCommand.java
 
b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/storage/ReconstructMasterDomainCommand.java
index cf3fb37..5f0f720 100644
--- 
a/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/storage/ReconstructMasterDomainCommand.java
+++ 
b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/storage/ReconstructMasterDomainCommand.java
@@ -1,6 +1,7 @@
 package org.ovirt.engine.core.bll.storage;
 
 import java.text.MessageFormat;
+import java.util.LinkedList;
 import java.util.List;
 
 import org.ovirt.engine.core.bll.Backend;
@@ -33,6 +34,7 @@
 import org.ovirt.engine.core.compat.Guid;
 import org.ovirt.engine.core.dal.VdcBllMessages;
 import org.ovirt.engine.core.dal.dbbroker.DbFacade;
+import org.ovirt.engine.core.utils.thread.LatchedRunnableExecuter;
 import org.ovirt.engine.core.utils.transaction.TransactionMethod;
 
 @SuppressWarnings("serial")
@@ -217,50 +219,63 @@
     }
 
     private void connectAndRefreshAllUpHosts(final boolean commandSucceeded) {
-        for (VDS vds : getAllRunningVdssInPool()) {
-            try {
-                if (!_isLastMaster && commandSucceeded && 
connectVdsToNewMaster(vds)) {
-                    try {
-                        runVdsCommand(
-                                VDSCommandType.RefreshStoragePool,
-                                new 
RefreshStoragePoolVDSCommandParameters(vds.getId(),
-                                        getStoragePool().getId(),
-                                        _newMasterStorageDomainId,
-                                        
getStoragePool().getmaster_domain_version()));
-                    } catch (VdcBLLException ex) {
-                        if (VdcBllErrors.StoragePoolUnknown == 
ex.getVdsError().getCode()) {
-                            VDSReturnValue returnVal = runVdsCommand(
-                                    VDSCommandType.ConnectStoragePool,
-                                    new 
ConnectStoragePoolVDSCommandParameters(vds.getId(),
-                                            getStoragePool().getId(), 
vds.getvds_spm_id(),
-                                            _newMasterStorageDomainId, 
getStoragePool()
-                                                    
.getmaster_domain_version()));
-                            if (!returnVal.getSucceeded()) {
-                                log.errorFormat("Post reconstruct actions 
(connectPool) did not complete on host {0} in the pool. error {1}",
-                                        vds.getId(),
-                                        returnVal.getVdsError().getMessage());
+        final boolean isPerformConnectOps = !_isLastMaster && commandSucceeded;
+        final boolean isPerformDisconnect = !getParameters().isInactive();
+        if (isPerformConnectOps || isPerformDisconnect) {
+            List<Runnable> tasks = new LinkedList<Runnable>();
+            for (final VDS vds : getAllRunningVdssInPool()) {
+                tasks.add(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            if (isPerformConnectOps && 
connectVdsToNewMaster(vds)) {
+                                try {
+                                    runVdsCommand(
+                                            VDSCommandType.RefreshStoragePool,
+                                            new 
RefreshStoragePoolVDSCommandParameters(vds.getId(),
+                                                    getStoragePool().getId(),
+                                                    _newMasterStorageDomainId,
+                                                    
getStoragePool().getmaster_domain_version()));
+                                } catch (VdcBLLException ex) {
+                                    if (VdcBllErrors.StoragePoolUnknown == 
ex.getVdsError().getCode()) {
+                                        VDSReturnValue returnVal = 
runVdsCommand(
+                                                
VDSCommandType.ConnectStoragePool,
+                                                new 
ConnectStoragePoolVDSCommandParameters(vds.getId(),
+                                                        
getStoragePool().getId(), vds.getvds_spm_id(),
+                                                        
_newMasterStorageDomainId, getStoragePool()
+                                                                
.getmaster_domain_version()));
+                                        if (!returnVal.getSucceeded()) {
+                                            log.errorFormat("Post reconstruct 
actions (connectPool) did not complete on host {0} in pool {1}. error {2}",
+                                                    vds.getId(),
+                                                    getStoragePool().getId(),
+                                                    
returnVal.getVdsError().getMessage());
+                                        }
+                                    } else {
+                                        log.errorFormat("Post reconstruct 
actions (refreshPool)"
+                                                + " did not complete on host 
{0} in pool {1}. error {2}",
+                                                vds.getId(),
+                                                getStoragePool().getId(),
+                                                ex.getMessage());
+                                    }
+                                }
                             }
-                        } else {
-                            log.errorFormat("Post reconstruct actions 
(refreshPool)"
+                            // only if we deactivate the storage domain we 
want to disconnect from it.
+                            if (isPerformDisconnect) {
+                                StorageHelperDirector.getInstance()
+                                        
.getItem(getStorageDomain().getstorage_type())
+                                        
.DisconnectStorageFromDomainByVdsId(getStorageDomain(), vds.getId());
+                            }
+
+                        } catch (Exception e) {
+                            log.errorFormat("Post reconstruct actions 
(connectPool,refreshPool,disconnect storage)"
                                     + " did not complete on host {0} in the 
pool. error {1}",
                                     vds.getId(),
-                                    ex.getMessage());
+                                    e.getMessage());
                         }
                     }
-                }
-                // only if we deactivate the storage domain we want to 
disconnect from it.
-                if (!getParameters().isInactive()) {
-                    StorageHelperDirector.getInstance()
-                            .getItem(getStorageDomain().getstorage_type())
-                            
.DisconnectStorageFromDomainByVdsId(getStorageDomain(), vds.getId());
-                }
-
-            } catch (Exception e) {
-                log.errorFormat("Post reconstruct actions 
(connectPool,refreshPool,disconnect storage)"
-                        + " did not complete on host {0} in the pool. error 
{1}",
-                        vds.getId(),
-                        e.getMessage());
+                });
             }
+            new LatchedRunnableExecuter(true, tasks).execute();
         }
     }
 
diff --git 
a/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/thread/LatchedRunnableExecuter.java
 
b/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/thread/LatchedRunnableExecuter.java
new file mode 100644
index 0000000..543afbf
--- /dev/null
+++ 
b/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/thread/LatchedRunnableExecuter.java
@@ -0,0 +1,70 @@
+package org.ovirt.engine.core.utils.thread;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.ovirt.engine.core.utils.ThreadUtils;
+import org.ovirt.engine.core.utils.threadpool.ThreadPoolUtil;
+
+public class LatchedRunnableExecuter {
+    boolean isRetrySubmit;
+    List<Runnable> runnables;
+    private final static int sleepTime = 5000;
+
+    /**
+     * @param isRetrySubmit
+     *            - inidication whether the executer should retry to submit 
the runnable if it was rejected
+     * @param runnables
+     *            - list of runnables for execution
+     */
+    public LatchedRunnableExecuter(boolean isRetrySubmit, List<Runnable> 
runnables) {
+        super();
+        this.isRetrySubmit = isRetrySubmit;
+        this.runnables = runnables;
+    }
+
+    protected void executeRunnable(LatchedRunnableWrapper runnable) {
+        ThreadPoolUtil.execute(runnable);
+    }
+
+    protected void performSleep() {
+        ThreadUtils.sleep(sleepTime);
+    }
+
+    protected LatchedRunnableWrapper createLatchedRunnableWrapper(Runnable 
runnable, CountDownLatch latch) {
+        return new LatchedRunnableWrapper(runnable, latch);
+    }
+
+    protected CountDownLatch createCountDownLatch() {
+        return new CountDownLatch(runnables.size());
+    }
+
+    /**
+     * executes the list of Runnable provided to this executer and returns 
when it completes. if isRetrySubmit is set to
+     * true, in case of rejected Runnable submission there will be attempt to 
submit the runnable every 'sleepTime'
+     * seconds. otherwise, the caught exception will be re-thrown.
+     */
+    public void execute() {
+        CountDownLatch latch = createCountDownLatch();
+        for (Runnable runnable : runnables) {
+            LatchedRunnableWrapper latchWrapper = 
createLatchedRunnableWrapper(runnable, latch);
+            while (true) {
+                try {
+                    executeRunnable(latchWrapper);
+                    break;
+                } catch (RejectedExecutionException e) {
+                    if (isRetrySubmit) {
+                        performSleep();
+                    } else {
+                        throw e;
+                    }
+                }
+            }
+        }
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+        }
+    }
+}
diff --git 
a/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapperTest.java
 
b/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapperTest.java
index fafc679..decd699 100644
--- 
a/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapperTest.java
+++ 
b/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapperTest.java
@@ -3,30 +3,46 @@
  */
 package org.ovirt.engine.core.utils.thread;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
-import static org.junit.Assert.assertEquals;
 
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.exceptions.base.MockitoException;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class LatchedRunnableWrapperTest {
+    private static final int THREADS_NUMBER = 20;
 
     private AtomicInteger counter;
+    private LatchedRunnableExecuter latchedRunnableExecuter;
+    private CountDownLatch latch;
+    private ExecutorService threadPool;
+    private RunnableCreator runnableCreator;
 
     private interface RunnableCreator {
         Runnable createRunnable();
     }
 
-
     private class DummyRunnable implements Runnable {
-
-
-        public DummyRunnable() {
-        }
-
         @Override
         public void run() {
             counter.incrementAndGet();
@@ -36,34 +52,146 @@
     @Before
     public void setup() {
         counter = new AtomicInteger();
-    }
-
-    /**
-     *
-     */
-    @Test
-    public void latchedRunnableWrapperTest() {
-        final int threadsNumber = 100;
-        runThreads(threadsNumber, new RunnableCreator() {
+        threadPool = Executors.newFixedThreadPool(THREADS_NUMBER);
+        runnableCreator = new RunnableCreator() {
 
             @Override
             public Runnable createRunnable() {
                 return new DummyRunnable();
             }
-        });
-        assertEquals(threadsNumber, counter.intValue());
+        };
     }
 
-    private void runThreads(final int threadsNumber, RunnableCreator 
runnableCreator) {
-        ExecutorService fixedThreadPool = 
Executors.newFixedThreadPool(threadsNumber);
-        CountDownLatch latch = new CountDownLatch(threadsNumber);
-        for (int index = 0; index < threadsNumber; index++) {
-            fixedThreadPool.execute(new 
LatchedRunnableWrapper(runnableCreator.createRunnable(), latch));
-        }
+    @Test
+    public void regularExecution() {
+        prepareMocks(runnableCreator, false, THREADS_NUMBER);
+        latchedRunnableExecuter.execute();
+        assertEquals("the counter wasn't incremented the expected number of 
times", THREADS_NUMBER, counter.intValue());
+        verifyCommonExecutionChecks();
+    }
+
+    @Test
+    public void submitFullFailureNoRetry() {
+        boolean gotException = false;
+        prepareMocks(runnableCreator, false, 0);
         try {
-            latch.await();
+            latchedRunnableExecuter.execute();
+        } catch (RejectedExecutionException e) {
+            gotException = true;
+        }
+        assertTrue("expected RejectedExecutionException wasn't thrown", 
gotException);
+        assertEquals("the counter was incremented more times then expected", 
0, counter.intValue());
+        verify(latchedRunnableExecuter, never()).performSleep();
+        assertEquals("latch counter wasn't in the expected value", 
THREADS_NUMBER, latch.getCount());
+        verifyCommonFailureChecks();
+    }
+
+    @Test
+    public void submitFullFailureWithRetry() {
+        prepareMocks(runnableCreator, true, 0);
+        latchedRunnableExecuter.execute();
+        assertEquals("the counter wasn't incremented the expected number of 
times", THREADS_NUMBER, counter.intValue());
+        verify(latchedRunnableExecuter, times(THREADS_NUMBER)).performSleep();
+        verifyCommonExecutionChecks();
+    }
+
+    @Test
+    public void submitPartialFailureWithRetry() {
+        int failExecuteFirstTime = 10;
+        prepareMocks(runnableCreator, true, THREADS_NUMBER - 
failExecuteFirstTime);
+        latchedRunnableExecuter.execute();
+        assertEquals("the counter wasn't incremented the expected number of 
times", THREADS_NUMBER, counter.intValue());
+        verify(latchedRunnableExecuter, 
times(failExecuteFirstTime)).performSleep();
+        verifyCommonExecutionChecks();
+    }
+
+    @Test
+    public void submitPartialFailureNoRetry() {
+        int expectedToRun = THREADS_NUMBER - 5;
+        prepareMocks(runnableCreator, false, expectedToRun);
+        boolean gotException = false;
+        try {
+            latchedRunnableExecuter.execute();
+        } catch (RejectedExecutionException e) {
+            gotException = true;
+        }
+        assertTrue("expected RejectedExecutionException wasn't thrown", 
gotException);
+        assertFalse("the counter wasn't incremented the expected number of 
times", expectedToRun < counter.intValue());
+        verify(latchedRunnableExecuter, never()).performSleep();
+        assertTrue("latch counter value was lower than expected", 
latch.getCount() > 0);
+        assertTrue("latch counter value was greater than expected", 
latch.getCount() < THREADS_NUMBER);
+        verifyCommonFailureChecks();
+    }
+
+    /**
+     * @param runnableCreator
+     * @param isSubmitRetry
+     * @param isExecuteOnFirstRun
+     */
+    private void prepareMocks(RunnableCreator runnableCreator, boolean 
isSubmitRetry, final int countToExecuteOnFirstRun) {
+        List<Runnable> runnables = new LinkedList<Runnable>();
+        for (int index = 0; index < THREADS_NUMBER; index++) {
+            runnables.add(runnableCreator.createRunnable());
+        }
+
+        latchedRunnableExecuter = spy(new 
LatchedRunnableExecuter(isSubmitRetry, runnables));
+        latch = spy(latchedRunnableExecuter.createCountDownLatch());
+
+        doReturn(latch).when(latchedRunnableExecuter).createCountDownLatch();
+        doNothing().when(latchedRunnableExecuter).performSleep();
+
+        final HashSet<Runnable> executedRunnables = new HashSet<Runnable>();
+
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                threadPool.execute((Runnable) invocation.getArguments()[0]);
+                return null;
+            }
+        
}).when(latchedRunnableExecuter).executeRunnable(any(LatchedRunnableWrapper.class));
+
+        doAnswer(new Answer<LatchedRunnableWrapper>() {
+            @Override
+            public LatchedRunnableWrapper answer(InvocationOnMock invocation) 
throws Throwable {
+                final LatchedRunnableWrapper toReturn =
+                        new LatchedRunnableWrapper((Runnable) 
invocation.getArguments()[0],
+                                (CountDownLatch) invocation.getArguments()[1]);
+                doAnswer(new Answer<Void>() {
+                    @Override
+                    public Void answer(InvocationOnMock invocation) throws 
Throwable {
+                        // this if is used to execute in case of an error 
(executedRunnables.contains()) will return
+                        // true and the runnable will be executed on it's 
second run if it will be performed
+                        // (depends onisSubmitRetry parameter)
+                        if (executedRunnables.contains(toReturn) || 
executedRunnables.size() < countToExecuteOnFirstRun) {
+                            threadPool.execute(toReturn);
+                            executedRunnables.add(toReturn);
+                        } else {
+                            executedRunnables.add(toReturn);
+                            throw new RejectedExecutionException();
+                        }
+                        return null;
+                    }
+                }).when(latchedRunnableExecuter).executeRunnable(toReturn);
+                return toReturn;
+            }
+        
}).when(latchedRunnableExecuter).createLatchedRunnableWrapper(any(Runnable.class),
 any(CountDownLatch.class));
+    }
+
+    private void verifyCommonExecutionChecks() {
+        verify(latch, times(THREADS_NUMBER)).countDown();
+        assertEquals("latch counter value wasn't in the expected value", 0, 
latch.getCount());
+        try {
+            verify(latch, times(1)).await();
         } catch (InterruptedException e) {
-            e.printStackTrace();
+            throw new MockitoException(e.toString());
+        }
+    }
+
+    private void verifyCommonFailureChecks() {
+        try {
+            verify(latch, never()).await();
+        } catch (InterruptedException e) {
+            throw new MockitoException(e.toString());
         }
     }
 }


--
To view, visit http://gerrit.ovirt.org/10117
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I3c8dac368d60eac85a92762dffc5dbb8062b85ef
Gerrit-PatchSet: 1
Gerrit-Project: ovirt-engine
Gerrit-Branch: master
Gerrit-Owner: Liron Aravot <lara...@redhat.com>
_______________________________________________
Engine-patches mailing list
Engine-patches@ovirt.org
http://lists.ovirt.org/mailman/listinfo/engine-patches

Reply via email to