Liron Aravot has uploaded a new change for review. Change subject: core: ReconstructMasterDomain -connect/disconnect hosts simultaneously ......................................................................
core: ReconstructMasterDomain -connect/disconnect hosts simultaneously When running ReconstructMasterDomain/RecoveryStoragePool commands - any connect/disconnect operations can be done simultaneously by different threads in order to improve the runtime. This patch adds LatchedRunnableExecuter to execute LatchedRunnableWrapper instances using CountDownLatch and makes use of it in ReconstructMasterDomainCommand as well as test class. Change-Id: I3c8dac368d60eac85a92762dffc5dbb8062b85ef Signed-off-by: Liron Aravot <lara...@redhat.com> --- M backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/storage/ReconstructMasterDomainCommand.java A backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/thread/LatchedRunnableExecuter.java M backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapperTest.java 3 files changed, 277 insertions(+), 64 deletions(-) git pull ssh://gerrit.ovirt.org:29418/ovirt-engine refs/changes/17/10117/1 diff --git a/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/storage/ReconstructMasterDomainCommand.java b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/storage/ReconstructMasterDomainCommand.java index cf3fb37..5f0f720 100644 --- a/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/storage/ReconstructMasterDomainCommand.java +++ b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/storage/ReconstructMasterDomainCommand.java @@ -1,6 +1,7 @@ package org.ovirt.engine.core.bll.storage; import java.text.MessageFormat; +import java.util.LinkedList; import java.util.List; import org.ovirt.engine.core.bll.Backend; @@ -33,6 +34,7 @@ import org.ovirt.engine.core.compat.Guid; import org.ovirt.engine.core.dal.VdcBllMessages; import org.ovirt.engine.core.dal.dbbroker.DbFacade; +import org.ovirt.engine.core.utils.thread.LatchedRunnableExecuter; import org.ovirt.engine.core.utils.transaction.TransactionMethod; @SuppressWarnings("serial") @@ -217,50 +219,63 @@ } private void connectAndRefreshAllUpHosts(final boolean commandSucceeded) { - for (VDS vds : getAllRunningVdssInPool()) { - try { - if (!_isLastMaster && commandSucceeded && connectVdsToNewMaster(vds)) { - try { - runVdsCommand( - VDSCommandType.RefreshStoragePool, - new RefreshStoragePoolVDSCommandParameters(vds.getId(), - getStoragePool().getId(), - _newMasterStorageDomainId, - getStoragePool().getmaster_domain_version())); - } catch (VdcBLLException ex) { - if (VdcBllErrors.StoragePoolUnknown == ex.getVdsError().getCode()) { - VDSReturnValue returnVal = runVdsCommand( - VDSCommandType.ConnectStoragePool, - new ConnectStoragePoolVDSCommandParameters(vds.getId(), - getStoragePool().getId(), vds.getvds_spm_id(), - _newMasterStorageDomainId, getStoragePool() - .getmaster_domain_version())); - if (!returnVal.getSucceeded()) { - log.errorFormat("Post reconstruct actions (connectPool) did not complete on host {0} in the pool. error {1}", - vds.getId(), - returnVal.getVdsError().getMessage()); + final boolean isPerformConnectOps = !_isLastMaster && commandSucceeded; + final boolean isPerformDisconnect = !getParameters().isInactive(); + if (isPerformConnectOps || isPerformDisconnect) { + List<Runnable> tasks = new LinkedList<Runnable>(); + for (final VDS vds : getAllRunningVdssInPool()) { + tasks.add(new Runnable() { + @Override + public void run() { + try { + if (isPerformConnectOps && connectVdsToNewMaster(vds)) { + try { + runVdsCommand( + VDSCommandType.RefreshStoragePool, + new RefreshStoragePoolVDSCommandParameters(vds.getId(), + getStoragePool().getId(), + _newMasterStorageDomainId, + getStoragePool().getmaster_domain_version())); + } catch (VdcBLLException ex) { + if (VdcBllErrors.StoragePoolUnknown == ex.getVdsError().getCode()) { + VDSReturnValue returnVal = runVdsCommand( + VDSCommandType.ConnectStoragePool, + new ConnectStoragePoolVDSCommandParameters(vds.getId(), + getStoragePool().getId(), vds.getvds_spm_id(), + _newMasterStorageDomainId, getStoragePool() + .getmaster_domain_version())); + if (!returnVal.getSucceeded()) { + log.errorFormat("Post reconstruct actions (connectPool) did not complete on host {0} in pool {1}. error {2}", + vds.getId(), + getStoragePool().getId(), + returnVal.getVdsError().getMessage()); + } + } else { + log.errorFormat("Post reconstruct actions (refreshPool)" + + " did not complete on host {0} in pool {1}. error {2}", + vds.getId(), + getStoragePool().getId(), + ex.getMessage()); + } + } } - } else { - log.errorFormat("Post reconstruct actions (refreshPool)" + // only if we deactivate the storage domain we want to disconnect from it. + if (isPerformDisconnect) { + StorageHelperDirector.getInstance() + .getItem(getStorageDomain().getstorage_type()) + .DisconnectStorageFromDomainByVdsId(getStorageDomain(), vds.getId()); + } + + } catch (Exception e) { + log.errorFormat("Post reconstruct actions (connectPool,refreshPool,disconnect storage)" + " did not complete on host {0} in the pool. error {1}", vds.getId(), - ex.getMessage()); + e.getMessage()); } } - } - // only if we deactivate the storage domain we want to disconnect from it. - if (!getParameters().isInactive()) { - StorageHelperDirector.getInstance() - .getItem(getStorageDomain().getstorage_type()) - .DisconnectStorageFromDomainByVdsId(getStorageDomain(), vds.getId()); - } - - } catch (Exception e) { - log.errorFormat("Post reconstruct actions (connectPool,refreshPool,disconnect storage)" - + " did not complete on host {0} in the pool. error {1}", - vds.getId(), - e.getMessage()); + }); } + new LatchedRunnableExecuter(true, tasks).execute(); } } diff --git a/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/thread/LatchedRunnableExecuter.java b/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/thread/LatchedRunnableExecuter.java new file mode 100644 index 0000000..543afbf --- /dev/null +++ b/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/thread/LatchedRunnableExecuter.java @@ -0,0 +1,70 @@ +package org.ovirt.engine.core.utils.thread; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.RejectedExecutionException; + +import org.ovirt.engine.core.utils.ThreadUtils; +import org.ovirt.engine.core.utils.threadpool.ThreadPoolUtil; + +public class LatchedRunnableExecuter { + boolean isRetrySubmit; + List<Runnable> runnables; + private final static int sleepTime = 5000; + + /** + * @param isRetrySubmit + * - inidication whether the executer should retry to submit the runnable if it was rejected + * @param runnables + * - list of runnables for execution + */ + public LatchedRunnableExecuter(boolean isRetrySubmit, List<Runnable> runnables) { + super(); + this.isRetrySubmit = isRetrySubmit; + this.runnables = runnables; + } + + protected void executeRunnable(LatchedRunnableWrapper runnable) { + ThreadPoolUtil.execute(runnable); + } + + protected void performSleep() { + ThreadUtils.sleep(sleepTime); + } + + protected LatchedRunnableWrapper createLatchedRunnableWrapper(Runnable runnable, CountDownLatch latch) { + return new LatchedRunnableWrapper(runnable, latch); + } + + protected CountDownLatch createCountDownLatch() { + return new CountDownLatch(runnables.size()); + } + + /** + * executes the list of Runnable provided to this executer and returns when it completes. if isRetrySubmit is set to + * true, in case of rejected Runnable submission there will be attempt to submit the runnable every 'sleepTime' + * seconds. otherwise, the caught exception will be re-thrown. + */ + public void execute() { + CountDownLatch latch = createCountDownLatch(); + for (Runnable runnable : runnables) { + LatchedRunnableWrapper latchWrapper = createLatchedRunnableWrapper(runnable, latch); + while (true) { + try { + executeRunnable(latchWrapper); + break; + } catch (RejectedExecutionException e) { + if (isRetrySubmit) { + performSleep(); + } else { + throw e; + } + } + } + } + try { + latch.await(); + } catch (InterruptedException e) { + } + } +} diff --git a/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapperTest.java b/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapperTest.java index fafc679..decd699 100644 --- a/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapperTest.java +++ b/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapperTest.java @@ -3,30 +3,46 @@ */ package org.ovirt.engine.core.utils.thread; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicInteger; -import static org.junit.Assert.assertEquals; import org.junit.Before; import org.junit.Test; +import org.mockito.exceptions.base.MockitoException; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class LatchedRunnableWrapperTest { + private static final int THREADS_NUMBER = 20; private AtomicInteger counter; + private LatchedRunnableExecuter latchedRunnableExecuter; + private CountDownLatch latch; + private ExecutorService threadPool; + private RunnableCreator runnableCreator; private interface RunnableCreator { Runnable createRunnable(); } - private class DummyRunnable implements Runnable { - - - public DummyRunnable() { - } - @Override public void run() { counter.incrementAndGet(); @@ -36,34 +52,146 @@ @Before public void setup() { counter = new AtomicInteger(); - } - - /** - * - */ - @Test - public void latchedRunnableWrapperTest() { - final int threadsNumber = 100; - runThreads(threadsNumber, new RunnableCreator() { + threadPool = Executors.newFixedThreadPool(THREADS_NUMBER); + runnableCreator = new RunnableCreator() { @Override public Runnable createRunnable() { return new DummyRunnable(); } - }); - assertEquals(threadsNumber, counter.intValue()); + }; } - private void runThreads(final int threadsNumber, RunnableCreator runnableCreator) { - ExecutorService fixedThreadPool = Executors.newFixedThreadPool(threadsNumber); - CountDownLatch latch = new CountDownLatch(threadsNumber); - for (int index = 0; index < threadsNumber; index++) { - fixedThreadPool.execute(new LatchedRunnableWrapper(runnableCreator.createRunnable(), latch)); - } + @Test + public void regularExecution() { + prepareMocks(runnableCreator, false, THREADS_NUMBER); + latchedRunnableExecuter.execute(); + assertEquals("the counter wasn't incremented the expected number of times", THREADS_NUMBER, counter.intValue()); + verifyCommonExecutionChecks(); + } + + @Test + public void submitFullFailureNoRetry() { + boolean gotException = false; + prepareMocks(runnableCreator, false, 0); try { - latch.await(); + latchedRunnableExecuter.execute(); + } catch (RejectedExecutionException e) { + gotException = true; + } + assertTrue("expected RejectedExecutionException wasn't thrown", gotException); + assertEquals("the counter was incremented more times then expected", 0, counter.intValue()); + verify(latchedRunnableExecuter, never()).performSleep(); + assertEquals("latch counter wasn't in the expected value", THREADS_NUMBER, latch.getCount()); + verifyCommonFailureChecks(); + } + + @Test + public void submitFullFailureWithRetry() { + prepareMocks(runnableCreator, true, 0); + latchedRunnableExecuter.execute(); + assertEquals("the counter wasn't incremented the expected number of times", THREADS_NUMBER, counter.intValue()); + verify(latchedRunnableExecuter, times(THREADS_NUMBER)).performSleep(); + verifyCommonExecutionChecks(); + } + + @Test + public void submitPartialFailureWithRetry() { + int failExecuteFirstTime = 10; + prepareMocks(runnableCreator, true, THREADS_NUMBER - failExecuteFirstTime); + latchedRunnableExecuter.execute(); + assertEquals("the counter wasn't incremented the expected number of times", THREADS_NUMBER, counter.intValue()); + verify(latchedRunnableExecuter, times(failExecuteFirstTime)).performSleep(); + verifyCommonExecutionChecks(); + } + + @Test + public void submitPartialFailureNoRetry() { + int expectedToRun = THREADS_NUMBER - 5; + prepareMocks(runnableCreator, false, expectedToRun); + boolean gotException = false; + try { + latchedRunnableExecuter.execute(); + } catch (RejectedExecutionException e) { + gotException = true; + } + assertTrue("expected RejectedExecutionException wasn't thrown", gotException); + assertFalse("the counter wasn't incremented the expected number of times", expectedToRun < counter.intValue()); + verify(latchedRunnableExecuter, never()).performSleep(); + assertTrue("latch counter value was lower than expected", latch.getCount() > 0); + assertTrue("latch counter value was greater than expected", latch.getCount() < THREADS_NUMBER); + verifyCommonFailureChecks(); + } + + /** + * @param runnableCreator + * @param isSubmitRetry + * @param isExecuteOnFirstRun + */ + private void prepareMocks(RunnableCreator runnableCreator, boolean isSubmitRetry, final int countToExecuteOnFirstRun) { + List<Runnable> runnables = new LinkedList<Runnable>(); + for (int index = 0; index < THREADS_NUMBER; index++) { + runnables.add(runnableCreator.createRunnable()); + } + + latchedRunnableExecuter = spy(new LatchedRunnableExecuter(isSubmitRetry, runnables)); + latch = spy(latchedRunnableExecuter.createCountDownLatch()); + + doReturn(latch).when(latchedRunnableExecuter).createCountDownLatch(); + doNothing().when(latchedRunnableExecuter).performSleep(); + + final HashSet<Runnable> executedRunnables = new HashSet<Runnable>(); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + threadPool.execute((Runnable) invocation.getArguments()[0]); + return null; + } + }).when(latchedRunnableExecuter).executeRunnable(any(LatchedRunnableWrapper.class)); + + doAnswer(new Answer<LatchedRunnableWrapper>() { + @Override + public LatchedRunnableWrapper answer(InvocationOnMock invocation) throws Throwable { + final LatchedRunnableWrapper toReturn = + new LatchedRunnableWrapper((Runnable) invocation.getArguments()[0], + (CountDownLatch) invocation.getArguments()[1]); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + // this if is used to execute in case of an error (executedRunnables.contains()) will return + // true and the runnable will be executed on it's second run if it will be performed + // (depends onisSubmitRetry parameter) + if (executedRunnables.contains(toReturn) || executedRunnables.size() < countToExecuteOnFirstRun) { + threadPool.execute(toReturn); + executedRunnables.add(toReturn); + } else { + executedRunnables.add(toReturn); + throw new RejectedExecutionException(); + } + return null; + } + }).when(latchedRunnableExecuter).executeRunnable(toReturn); + return toReturn; + } + }).when(latchedRunnableExecuter).createLatchedRunnableWrapper(any(Runnable.class), any(CountDownLatch.class)); + } + + private void verifyCommonExecutionChecks() { + verify(latch, times(THREADS_NUMBER)).countDown(); + assertEquals("latch counter value wasn't in the expected value", 0, latch.getCount()); + try { + verify(latch, times(1)).await(); } catch (InterruptedException e) { - e.printStackTrace(); + throw new MockitoException(e.toString()); + } + } + + private void verifyCommonFailureChecks() { + try { + verify(latch, never()).await(); + } catch (InterruptedException e) { + throw new MockitoException(e.toString()); } } } -- To view, visit http://gerrit.ovirt.org/10117 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I3c8dac368d60eac85a92762dffc5dbb8062b85ef Gerrit-PatchSet: 1 Gerrit-Project: ovirt-engine Gerrit-Branch: master Gerrit-Owner: Liron Aravot <lara...@redhat.com> _______________________________________________ Engine-patches mailing list Engine-patches@ovirt.org http://lists.ovirt.org/mailman/listinfo/engine-patches