andyvuong commented on a change in pull request #1081: SOLR-13101: Concurrency 
tests for SHARED collection.
URL: https://github.com/apache/lucene-solr/pull/1081#discussion_r358427868
 
 

 ##########
 File path: 
solr/core/src/test/org/apache/solr/store/shared/SharedCoreConcurrencyTest.java
 ##########
 @@ -49,411 +58,673 @@
 /**
  * Tests around synchronization of concurrent indexing, pushes and pulls
  * happening on a core of a shared collection {@link 
DocCollection#getSharedIndex()}
- * todo: add tests for failover scenarios and involve query pulls 
  */
 public class SharedCoreConcurrencyTest extends SolrCloudSharedStoreTestCase {
 
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
+  private static final String COLLECTION_NAME = "sharedCollection";
+  private static final String SHARD_NAME = "shard1";
+  /**
+   * Number of serial indexing iterations for each test. This is the main 
setting, queries and failover iterations
+   * stop after indexing ends. Higher the value, longer the tests would run.
+   */
+  private static int INDEXING_ITERATIONS = TEST_NIGHTLY ? 100 : 20;
+  /**
+   * Maximum number of concurrent indexing requests per indexing iteration.
+   */
+  private static int MAX_NUM_OF_CONCURRENT_INDEXING_REQUESTS_PER_ITERATION = 
10;
+  /**
+   * Maximum number of docs per indexing request.
+   */
+  private static int MAX_NUM_OF_DOCS_PER_INDEXING_REQUEST = 100;
+  /**
+   * Indexing can fail because of leader failures (especially when test {@link 
#includeFailovers()}).
+   * The test will re-attempt up till this number of times before bailing out. 
For test to succeed,
+   * indexing request have to succeed in these many attempts.
+   */
+  private static int MAX_NUM_OF_ATTEMPTS_PER_INDEXING_REQUEST = 10;
+  /**
+   * Maximum number of concurrent query requests per query iteration.
+   */
+  private static int MAX_NUM_OF_CONCURRENT_QUERY_REQUESTS_PER_ITERATION = 10;
+  /**
+   * Indexing is faster than indexing, to pace it better with indexing, a 
delay is added between each query iteration.
+   */
+  private static int DELAY_MS_BETWEEN_EACH_QUERY_ITERATION = 50;
+  /**
+   * Minimum time between each failover.
+   */
+  private static int DELAY_MS_BETWEEN_EACH_FAILOVER_ITERATION = 500;
+  /**
+   * Path for local shared store
+   */
   private static Path sharedStoreRootPath;
 
+  /**
+   * Manages test state from start to end.
+   */
+  private TestState testState;
+
   @BeforeClass
   public static void setupClass() throws Exception {
     sharedStoreRootPath = createTempDir("tempDir");
   }
 
+  @Before
+  public void setupTest() throws Exception {
+    int numNodes = 4;
+    setupCluster(numNodes);
+    testState = new TestState();
+    setupSolrNodesForTest();
+
+    int maxShardsPerNode = 1;
+    // One less than number of nodes.
+    // The extra node will be used at the end of test to verify 
+    // the contents of shared store by querying for all docs on a new replica.
+    int numReplicas = numNodes - 1;
+    // Later on we can consider choosing random number of shards and replicas.
+    // To handle multiple shards, we need to update code where SHARD_NAME is 
used.
+    setupSharedCollectionWithShardNames(COLLECTION_NAME, maxShardsPerNode, 
numReplicas, SHARD_NAME);
+  }
+
+  @After
+  public void teardownTest() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    FileUtils.cleanDirectory(sharedStoreRootPath.toFile());
+  }
+
   /**
-   * Tests issuing random number of concurrent indexing requests in a given 
range for a shared core and making sure they all succeed.
+   * Tests that concurrent indexing succeed.
    */
   @Test
-  public void testHighConcurrentIndexing() throws Exception {
-    int maxIndexingThreads = 100;
-    int maxDocsPerThread = 100;
-    testConcurrentIndexing(maxIndexingThreads, maxDocsPerThread);
+  public void testIndexing() throws Exception {
+    final boolean includeDeletes = false;
+    includeIndexing(includeDeletes);
+    run();
   }
 
   /**
-   * Test ensuring two indexing requests interleave in desired ways and 
succeed.
+   * Tests that concurrent indexing with concurrent queries succeed.
    */
-  //  @Test todo: leaking threads, test only issue
-  public void TODOtestPossibleInterleaving() throws Exception {
-
-    // completely serialized
-    testConcurrentIndexing(
-        new IndexingThreadInterleaver(
-            SharedCoreStage.BlobPushFinished,
-            SharedCoreStage.LocalIndexingStarted,
-            true));
-
-    // second pushes for first one too
-    testConcurrentIndexing(
-        new IndexingThreadInterleaver(
-            SharedCoreStage.LocalIndexingFinished, 
-            SharedCoreStage.BlobPushFinished,
-            true));
+  @Test
+  public void testIndexingQueries() throws Exception {
+    final boolean includeDeletes = false;
+    includeIndexing(includeDeletes);
+    includeQueries();
+    run();
   }
 
   /**
-   * Test ensuring two indexing requests do not interleave in impossible ways 
even when forced and still succeed.
+   * Tests that concurrent indexing with deletes and concurrent queries 
succeed.
    */
-  //  @Test todo: leaking threads, test only issue
-  public void TODOtestImpossibleInterleaving() throws Exception {
-    // push critical section
-    testConcurrentIndexing(
-        new IndexingThreadInterleaver(
-            SharedCoreStage.ZkUpdateFinished,
-            SharedCoreStage.ZkUpdateFinished,
-            false));
-
-    // another push critical section
-    testConcurrentIndexing(
-        new IndexingThreadInterleaver(
-            SharedCoreStage.ZkUpdateFinished,
-            SharedCoreStage.LocalCacheUpdateFinished,
-            false));
+  @Test
+  public void testIndexingQueriesDeletes() throws Exception {
+    final boolean includeDeletes = true;
+    includeIndexing(includeDeletes);
+    includeQueries();
+    run();
   }
 
-  private void testConcurrentIndexing(int maxIndexingThreads, int 
maxDocsPerThread) throws Exception {
-    int numIndexingThreads = new Random().nextInt(maxIndexingThreads) + 1;;
-    testConcurrentIndexing(numIndexingThreads, maxDocsPerThread, null);
+  /**
+   * Tests that concurrent indexing with deletes, concurrent queries and 
explicit failovers succeed.
+   */
+  // @Test 
+  // TODO: This test flaps time to time. The symptom of the failure is missing 
docs i.e. indexing is declared successful
+  //       but query could not reproduce all of the docs. I was able to repro 
this with NRT collection on vanilla 8.3 too. 
+  //       I have not root caused it yet. Keeping this test disabled, until 
the problem is root caused and fixed. 
+  public void todoTestIndexingQueriesDeletesFailovers() throws Exception { 
+    final boolean includeDeletes = true;
+    includeIndexing(includeDeletes);
+    includeQueries();
+    includeFailovers();
+    run();
   }
 
-  private void testConcurrentIndexing(IndexingThreadInterleaver interleaver) 
throws Exception {
-    testConcurrentIndexing(2, 10, interleaver);
+  /**
+   * It starts all the threads that are included in the test (indexing, 
queries and failovers) in parallel.
+   * Then wait for them to finish (run length depends on {@link 
#INDEXING_ITERATIONS}).
+   * At the end it makes sures that no critical section was breached and no 
unexpected error occurred.
+   * Then verify the contents of shared store by querying for all docs on a 
new replica.
+   */
+  private void run() throws Exception {
+    testState.startIncludedThreads();
+    testState.waitForThreadsToStop();
+    analyzeCoreConcurrencyStagesForBreaches();
+    testState.checkErrors();
+    Replica newReplica = addReplica();
+    queryNewReplicaAndVerifyAllDocsFound(newReplica);
   }
 
   /**
-   * Start desired number of concurrent indexing threads with each indexing 
random number of docs between 1 and maxDocsPerThread.
+   * Adds a thread to test, that goes over {@link #INDEXING_ITERATIONS} or 
until it is interrupted.
+   * In each iteration it creates between 1 and {@link 
#MAX_NUM_OF_CONCURRENT_INDEXING_REQUESTS_PER_ITERATION} threads
+   * by calling {@link #createIndexingThreads(int, int, boolean)}, starts them 
concurrently and wait for them to finish
+   * before going to next iteration. Each indexing thread adds between 1 and  
{@link #MAX_NUM_OF_DOCS_PER_INDEXING_REQUEST}
+   * docs.
    *
-   * At the end it verifies everything got indexed successfully on leader. No 
critical section got breached.
-   * Also verify the integrity of shared store contents by pulling them on a 
follower replica.
+   * @param includeDeletes whether to randomly mark some docs for deletion and 
delete them in subsequent indexing requests
+   *                       or not
    */
-  private void testConcurrentIndexing(int numIndexingThreads, int 
maxDocsPerThread, IndexingThreadInterleaver interleaver) throws Exception {
-    setupCluster(2);
-    CloudSolrClient cloudClient = cluster.getSolrClient();
-
-    // this map tracks the async pull queues per solr process
-    Map<String, Map<String, CountDownLatch>> solrProcessesTaskTracker = new 
HashMap<>();
-
-    JettySolrRunner solrProcess1 = cluster.getJettySolrRunner(0);
-    CoreStorageClient storageClient1 = 
setupLocalBlobStoreClient(sharedStoreRootPath, DEFAULT_BLOB_DIR_NAME);
-    
setupTestSharedClientForNode(getBlobStorageProviderTestInstance(storageClient1),
 solrProcess1);
-    Map<String, CountDownLatch> asyncPullLatches1 = 
configureTestBlobProcessForNode(solrProcess1);
-
-    JettySolrRunner solrProcess2 = cluster.getJettySolrRunner(1);
-    CoreStorageClient storageClient2 = 
setupLocalBlobStoreClient(sharedStoreRootPath, DEFAULT_BLOB_DIR_NAME);
-    
setupTestSharedClientForNode(getBlobStorageProviderTestInstance(storageClient2),
 solrProcess2);
-    Map<String, CountDownLatch> asyncPullLatches2 = 
configureTestBlobProcessForNode(solrProcess2);
-
-    solrProcessesTaskTracker.put(solrProcess1.getNodeName(), 
asyncPullLatches1);
-    solrProcessesTaskTracker.put(solrProcess2.getNodeName(), 
asyncPullLatches2);
-
-    String collectionName = "sharedCollection";
-    int maxShardsPerNode = 1;
-    int numReplicas = 2;
-    // specify a comma-delimited string of shard names for multiple shards 
when using
-    // an implicit router
-    String shardNames = "shard1";
-    setupSharedCollectionWithShardNames(collectionName, maxShardsPerNode, 
numReplicas, shardNames);
-
-    // get the leader replica and follower replicas
-    DocCollection collection = 
cloudClient.getZkStateReader().getClusterState().getCollection(collectionName);
-    Replica shardLeaderReplica = collection.getLeader("shard1");
-    Replica followerReplica = null;
-    for (Replica repl : collection.getSlice("shard1").getReplicas()) {
-      if (repl.getName() != shardLeaderReplica.getName()) {
-        followerReplica = repl;
-        break;
+  private void includeIndexing(boolean includeDeletes) {
+    Thread t = new Thread(() -> {
+      try {
+        for (int i = 0; i < INDEXING_ITERATIONS && 
!testState.stopRunning.get(); i++) {
+          int numIndexingThreads = 
random().nextInt(MAX_NUM_OF_CONCURRENT_INDEXING_REQUESTS_PER_ITERATION) + 1;
+          int numDocsToAddPerThread = 
random().nextInt(MAX_NUM_OF_DOCS_PER_INDEXING_REQUEST) + 1;
+          Thread[] indexingThreads = createIndexingThreads(numIndexingThreads, 
numDocsToAddPerThread, includeDeletes);
+          for (int j = 0; j < numIndexingThreads; j++) {
+            indexingThreads[j].start();
+          }
+          for (int j = 0; j < numIndexingThreads; j++) {
+            indexingThreads[j].join();
+          }
+          if (Thread.interrupted()) {
+            // we have been interrupted so we will stop running
+            testState.stopRunning.set(true);
+          }
+        }
+      } catch (Exception ex) {
+        testState.indexingErrors.add(ex.getMessage());
       }
-    }
+      // everything else stops running when indexing finishes
+      testState.stopRunning.set(true);
+    });
+    testState.includeThread(t);
+  }
 
-    
-   JettySolrRunner shardLeaderSolrRunner = null;
-   for (JettySolrRunner solrRunner : cluster.getJettySolrRunners()) {
-     if(solrRunner.getNodeName().equals(shardLeaderReplica.getNodeName())){
-       shardLeaderSolrRunner = solrRunner;
-       break;
-     }
-   }
-   List<String> progress = new ArrayList<>();
-
-   if(interleaver != null) {
-     configureTestSharedConcurrencyControllerForNode(shardLeaderSolrRunner, 
progress, interleaver);
-   } else {
-     configureTestSharedConcurrencyControllerForNode(shardLeaderSolrRunner, 
progress);
-   }
-
-    AtomicInteger totalDocs= new AtomicInteger(0);
+  /**
+   * Creates {@code numIndexingThreads} threads with each adding {@code 
numDocsToAddPerThread}.
+   *
+   * @param includeDeletes whether to randomly mark some docs for deletion and 
delete them in subsequent indexing requests
+   *                       or not
+   */
+  private Thread[] createIndexingThreads(int numIndexingThreads, int 
numDocsToAddPerThread, boolean includeDeletes) throws Exception {
     log.info("numIndexingThreads=" + numIndexingThreads);
     Thread[] indexingThreads = new Thread[numIndexingThreads];
-    ConcurrentLinkedQueue<String> indexingErrors = new 
ConcurrentLinkedQueue<>();
-    for (int i = 0; i < numIndexingThreads; i++) {
+    for (int i = 0; i < numIndexingThreads && !testState.stopRunning.get(); 
i++) {
       indexingThreads[i] = new Thread(() -> {
-        try {
-          // index between 1 to maxDocsPerThread docs
-          int numDocs = new Random().nextInt(maxDocsPerThread) + 1;
-          log.info("numDocs=" + numDocs);
-          UpdateRequest updateReq = new UpdateRequest();
-          for (int j = 0; j < numDocs; j++) {
-            int docId = totalDocs.incrementAndGet();
-            updateReq.add("id", Integer.toString(docId));
+        List<String> idsToAdd = new ArrayList<>();
+        // prepare the list of docs to add and delete outside the reattempt 
loop
+        for (int j = 0; j < numDocsToAddPerThread; j++) {
+          String docId = 
Integer.toString(testState.docIdGenerator.incrementAndGet());
+          idsToAdd.add(docId);
+        }
+        List<String> idsToDelete = testState.idBatchesToDelete.poll();
+
+        // attempt until succeeded or max attempts 
+        for (int j = 0; j < MAX_NUM_OF_ATTEMPTS_PER_INDEXING_REQUEST; j++) {
+          try {
+            String message = "attempt=" + (j + 1) + " numDocsToAdd=" + 
numDocsToAddPerThread + " docsToAdd=" + idsToAdd.toString();
+            if (idsToDelete != null) {
+              message += " numDocsToDelete=" + idsToDelete.size() + " 
docsToDelete=" + idsToDelete.toString();
+            }
+            log.info(message);
+
+            UpdateRequest updateReq = new UpdateRequest();
+            for (int k = 0; k < idsToAdd.size(); k++) {
+              updateReq.add("id", idsToAdd.get(k));
+            }
+            if (includeDeletes && idsToDelete != null) {
+              updateReq.deleteById(idsToDelete);
+            }
+            processUpdateRequest(updateReq);
+
+            testState.numDocsIndexed.addAndGet(numDocsToAddPerThread);
+            if (idsToDelete != null) {
+              testState.idsDeleted.addAll(idsToDelete);
+            }
+
+            // randomly select some docs that can be deleted
+            if (includeDeletes) {
+              List<String> idsThatCanBeDeleted = new ArrayList<>();
+              for (String indexedId : idsToAdd) {
+                if (random().nextBoolean()) {
+                  idsThatCanBeDeleted.add(indexedId);
+                }
+              }
+              if (!idsThatCanBeDeleted.isEmpty()) {
+                testState.idBatchesToDelete.offer(idsThatCanBeDeleted);
+              }
+            }
+            // indexing was successful, stop attempting
+            break;
+          } catch (Exception ex) {
+            // last attempt also failed, record the error
+            if (j == MAX_NUM_OF_ATTEMPTS_PER_INDEXING_REQUEST - 1) {
+              
testState.indexingErrors.add(Throwables.getStackTraceAsString(ex));
+            }
           }
-          updateReq.commit(cloudClient, collectionName);
-        } catch (Exception ex) {
-          indexingErrors.add(ex.getMessage());
         }
       });
     }
+    return indexingThreads;
+  }
 
-    for (int i = 0; i < numIndexingThreads; i++) {
-      indexingThreads[i].start();
-    }
+  /**
+   * Sends update request to the server, randomly choosing whether to send it 
with commit=true or not
+   */
+  private void processUpdateRequest(UpdateRequest request) throws Exception {
+    UpdateResponse response = random().nextBoolean()
+        ? request.process(cluster.getSolrClient(), COLLECTION_NAME)
+        : request.commit(cluster.getSolrClient(), COLLECTION_NAME);
 
-    for (int i = 0; i < numIndexingThreads; i++) {
-      indexingThreads[i].join();
+    if (response.getStatus() != 0) {
+      throw new RuntimeException("Update request failed with status=" + 
response.getStatus());
     }
+  }
 
-    log.info("totalDocs=" + totalDocs.intValue());
-
-    assertTrue(indexingErrors.toString(), indexingErrors.isEmpty());
+  /**
+   * Adds a thread to test, that goes over iterations until the test is 
stopped {@link TestState#stopRunning}.
+   * In each iteration it creates between 1 and {@link 
#MAX_NUM_OF_CONCURRENT_QUERY_REQUESTS_PER_ITERATION} threads
+   * by calling {@link #createQueryThreads(int)}, starts them concurrently and 
wait for them to finish
+   * before going to next iteration. To pace it better with indexing, {@link 
#DELAY_MS_BETWEEN_EACH_QUERY_ITERATION}
+   * delay is added between each query iteration.
+   */
+  private void includeQueries() throws Exception {
+    Thread t = new Thread(() -> {
+      try {
+        while (!testState.stopRunning.get()) {
+          int numQueryThreads = 
random().nextInt(MAX_NUM_OF_CONCURRENT_QUERY_REQUESTS_PER_ITERATION) + 1;
+          Thread[] indexingThreads = createQueryThreads(numQueryThreads);
+          for (int j = 0; j < numQueryThreads; j++) {
+            indexingThreads[j].start();
+          }
+          for (int j = 0; j < numQueryThreads; j++) {
+            indexingThreads[j].join();
+          }
+          Thread.sleep(DELAY_MS_BETWEEN_EACH_QUERY_ITERATION);
+        }
+      } catch (Exception ex) {
+        testState.queryErrors.add(ex.getMessage());
+      }
+    });
+    testState.includeThread(t);
+  }
 
-    if(interleaver != null) {
-      assertNull(interleaver.error, interleaver.error);
+  /**
+   * Creates {@code numQueryThreads} threads with each querying all docs "*:*"
+   */
+  private Thread[] createQueryThreads(int numQueryThreads) throws Exception {
+    log.info("numQueryThreads=" + numQueryThreads);
+    Thread[] queryThreads = new Thread[numQueryThreads];
+    for (int i = 0; i < numQueryThreads && !testState.stopRunning.get(); i++) {
+      queryThreads[i] = new Thread(() -> {
+        try {
+          /** 
+           * Don't have a way to ensure freshness of results yet. When we add 
something for query freshness later 
+           * we may use that here.
+           *
+           * {@link SolrProcessTracker#corePullTracker} cannot help in 
concurrent query scenarios since there
+           * is no one-to-one guarantee between query and an async pull.
+           */
+          cluster.getSolrClient().query(COLLECTION_NAME, new 
ModifiableSolrParams().set("q", "*:*"));
+        } catch (Exception ex) {
+          testState.queryErrors.add(Throwables.getStackTraceAsString(ex));
+        }
+      });
     }
+    return queryThreads;
+  }
 
-    assertFalse("no progress recorded", progress.isEmpty());
-
-    log.info(progress.toString());
-
-    assertCriticalSections(progress);
+  /**
+   * Adds a thread to test, that goes over iterations until the test is 
stopped {@link TestState#stopRunning}.
+   * In each iteration it failovers to new leader by calling {@link 
#failOver()}. It waits
+   * for {@link #DELAY_MS_BETWEEN_EACH_FAILOVER_ITERATION} between each 
iteration.
+   */
+  private void includeFailovers() throws Exception {
+    Thread t = new Thread(() -> {
+      try {
+        while (!testState.stopRunning.get()) {
+          failOver();
+          Thread.sleep(DELAY_MS_BETWEEN_EACH_FAILOVER_ITERATION);
+        }
+      } catch (Exception ex) {
+        testState.failoverError = Throwables.getStackTraceAsString(ex);
+      }
+    });
+    testState.includeThread(t);
+  }
 
-    // verify the update wasn't forwarded to the follower and it didn't commit 
by checking the core
-    // this gives us confidence that the subsequent query we do triggers the 
pull
-    CoreContainer replicaCC = getCoreContainer(followerReplica.getNodeName());
-    SolrCore core = null;
-    SolrClient followerDirectClient = null;
-    SolrClient leaderDirectClient = null;
-    try {
-      core = replicaCC.getCore(followerReplica.getCoreName());
-      // the follower should only have the default segments file
-      assertEquals(1, 
core.getDeletionPolicy().getLatestCommit().getFileNames().size());
+  /**
+   * Kills the current leader and waits for the new leader to be selected and 
then brings back up the killed leader
+   * as a follower replica. Before bringing back up the replica it randomly 
decides to delete its core directory.
+   */
+  private void failOver() throws Exception {
+    DocCollection collection = getCollection();
+    Replica leaderReplicaBeforeSwitch = collection.getLeader(SHARD_NAME);
+    final String leaderReplicaNameBeforeSwitch = 
leaderReplicaBeforeSwitch.getName();
+    JettySolrRunner shardLeaderSolrRunnerBeforeSwitch = 
cluster.getReplicaJetty(leaderReplicaBeforeSwitch);
+    File leaderIndexDirBeforeSwitch = new 
File(shardLeaderSolrRunnerBeforeSwitch.getCoreContainer().getCoreRootDirectory()
+        + "/" + leaderReplicaBeforeSwitch.getCoreName());
+
+    shardLeaderSolrRunnerBeforeSwitch.stop();
+    cluster.waitForJettyToStop(shardLeaderSolrRunnerBeforeSwitch);
+    waitForState("Timed out waiting for new replica to become leader", 
COLLECTION_NAME, (liveNodes, collectionState) -> {
+      Slice slice = collectionState.getSlice(SHARD_NAME);
+      if (slice.getLeader() == null) {
+        return false;
+      }
+      if (slice.getLeader().getName().equals(leaderReplicaNameBeforeSwitch)) {
+        return false;
+      }
 
-      // query the leader directly to verify it should have the document
-      leaderDirectClient = getHttpSolrClient(shardLeaderReplica.getBaseUrl() + 
"/" + shardLeaderReplica.getCoreName());
-      ModifiableSolrParams params = new ModifiableSolrParams();
-      params
-          .set("q", "*:*")
-          .set("distrib", "false");
-      QueryResponse resp = leaderDirectClient.query(params);
-      assertEquals(totalDocs.intValue(), resp.getResults().getNumFound());
-
-      // we want to wait until the pull completes so set up a count down latch 
for the follower's
-      // core that we'll wait until pull finishes for
-      CountDownLatch latch = new CountDownLatch(1);
-      Map<String, CountDownLatch> asyncPullTasks = 
solrProcessesTaskTracker.get(followerReplica.getNodeName());
-      asyncPullTasks.put(followerReplica.getCoreName(), latch);
+      return true;
+    });
 
-      // query the follower directly to trigger the pull, this query should 
yield no results
-      // as it returns immediately 
-      followerDirectClient = getHttpSolrClient(followerReplica.getBaseUrl() + 
"/" + followerReplica.getCoreName());
-      resp = followerDirectClient.query(params);
-      assertEquals(0, resp.getResults().getNumFound());
+    if (random().nextBoolean()) {
+      FileUtils.deleteDirectory(leaderIndexDirBeforeSwitch);
+    }
 
-      // wait until pull is finished
-      assertTrue(latch.await(120, TimeUnit.SECONDS));
+    shardLeaderSolrRunnerBeforeSwitch.start();
+    cluster.waitForNode(shardLeaderSolrRunnerBeforeSwitch, -1);
 
-      // do another query to verify we've pulled everything
-      resp = followerDirectClient.query(params);
+    waitForState("Timed out waiting for restarted replica to become active", 
COLLECTION_NAME, (liveNodes, collectionState) -> {
+      Slice slice = collectionState.getSlice(SHARD_NAME);
+      if (slice.getReplica(leaderReplicaNameBeforeSwitch).getState() != 
Replica.State.ACTIVE) {
+        return false;
+      }
+      return true;
+    });
 
-      // verify we pulled
-      
assertTrue(core.getDeletionPolicy().getLatestCommit().getFileNames().size() > 
1);
+    setupSolrProcess(shardLeaderSolrRunnerBeforeSwitch);
+  }
 
-      // verify the document is present
-      assertEquals(totalDocs.intValue(), resp.getResults().getNumFound());
-    } finally {
-      if (leaderDirectClient != null) {
-        leaderDirectClient.close();
-      }
-      if (followerDirectClient != null) {
-        followerDirectClient.close();
-      }
-      if (core != null) {
-        core.close();
-      }
-      if (cluster != null) {
-        cluster.shutdown();
+  /**
+   * Goes over all the lives of a node(node gets a new life on restart) and 
then goes over each core's concurrency stages
+   * in each life. Logs the concurrency stages in the order they occurred and 
then analyze those stages to make sure no
+   * critical section was breached.
+   */
+  private void analyzeCoreConcurrencyStagesForBreaches() {
+    // Goes over each node
+    for (Map.Entry<String, List<SolrProcessTracker>> nodeTracker :
+        testState.solrNodesTracker.entrySet()) {
+      String nodeName = nodeTracker.getKey();
+      int lifeCountForNode = nodeTracker.getValue().size();
+      // Goes over each life of a node
+      for (int i = 0; i < lifeCountForNode; i++) {
+        ConcurrentHashMap<String, ConcurrentLinkedQueue<String>> 
coreConcurrencyStageTracker = 
nodeTracker.getValue().get(i).coreConcurrencyStageTracker;
+        if (coreConcurrencyStageTracker.isEmpty()) {
+          log.info("life " + (i + 1) + "/" + lifeCountForNode + " of node " + 
nodeName + " is empty");
+        } else {
+          // Goes over each core
+          for (Map.Entry<String, ConcurrentLinkedQueue<String>> 
coreConcurrencyStagesEntry : coreConcurrencyStageTracker.entrySet()) {
+            String coreName = coreConcurrencyStagesEntry.getKey();
+            List<String> coreConcurrencyStages = new 
ArrayList<>(coreConcurrencyStagesEntry.getValue());
+            // Log line is truncated beyond certain length, therefore, 
printing them in the batches of 200
+            List<List<String>> batches = 
Lists.partition(coreConcurrencyStages, 200);
+            if (batches.isEmpty()) {
+              batches = new ArrayList<>(1);
+              batches.add(new ArrayList<>(0));
+            }
+            for (int j = 0; j < batches.size(); j++) {
+              log.info("batch " + (j + 1) + "/" + batches.size()
+                  + " of core " + coreName
+                  + " of life " + (i + 1) + "/" + lifeCountForNode
+                  + " of node " + nodeName
+                  + "\n" + batches.get(j).toString());
+            }
+            analyzeCoreConcurrencyStagesForBreaches(coreName, 
coreConcurrencyStages);
+          }
+        }
       }
-      // clean up the shared store. The temp dir should clean up itself after 
the test class finishes
-      FileUtils.cleanDirectory(sharedStoreRootPath.toFile());
     }
   }
 
-  private void assertCriticalSections(List<String> progress) {
-    String currentThreadId = null;
+  /**
+   * Analyze core's concurrency stages to make sure no critical section was 
breached.
+   */
+  private void analyzeCoreConcurrencyStagesForBreaches(String coreName, 
List<String> coreConcurrencyStages) {
     SharedCoreStage currentStage = null;
-    String prevThreadId = null;
-    SharedCoreStage prevStage = null;
+    int activePullers = 0; // number of threads that have started pulling and 
not finished
     int activeIndexers = 0; // number of threads that have started indexing 
and not finished 
-    int activeBlobPushers = 0; // number of threads that are actively pushing 
at any given time
-    for (String p : progress) {
-      String[] parts = p.split("\\.");
-      currentThreadId = parts[0];
+    int activePushers = 0; // number of threads that are actively pushing at 
any given time
+    for (String s : coreConcurrencyStages) {
+      String[] parts = s.split("\\.");
       currentStage = SharedCoreStage.valueOf(parts[1]);
-      if (currentStage == SharedCoreStage.LocalIndexingStarted) {
+
+      if (currentStage == SharedCoreStage.BLOB_PULL_STARTED) {
+        activePullers++;
+      } else if (currentStage == SharedCoreStage.BLOB_PULL_FINISHED) {
+        activePullers--;
+      } else if (currentStage == SharedCoreStage.LOCAL_INDEXING_STARTED) {
         activeIndexers++;
-      } else if (currentStage == SharedCoreStage.BlobPushStarted) {
-        activeBlobPushers++;
-      } else if (currentStage == SharedCoreStage.BlobPushFinished) {
-        // both blob pushing and indexing finish at this stage 
-        activeBlobPushers--;
+      } else if (currentStage == SharedCoreStage.BLOB_PUSH_STARTED) {
+        activePushers++;
+      } else if (currentStage == SharedCoreStage.BLOB_PUSH_FINISHED) {
+        activePushers--;
+      } else if (currentStage == SharedCoreStage.INDEXING_BATCH_FINISHED) {
         activeIndexers--;
       }
 
-      // making sure no other activity takes place during pull
-      if (prevStage == SharedCoreStage.BlobPullStarted) {
-        assertTrue("Pull critical section breached, currentStage=" + p, 
prevThreadId==currentThreadId && currentStage == 
SharedCoreStage.BlobPullFinished);
-      }
-
-      // making sure indexing is not disrupted by a pull from blob
-      assertFalse("Indexing breached by a pull, currentStage=" + p,
-          activeIndexers > 0 &&
-              (currentStage == SharedCoreStage.BlobPullStarted || currentStage 
== SharedCoreStage.BlobPullFinished));
+      // making sure no other activity (including another pull) takes place 
during pull
+      assertFalse("Pull and indexing are interleaved, coreName=" + coreName + 
" currentStage=" + s, activePullers > 1 || (activePullers > 0 && 
(activeIndexers > 0 || activePushers > 0)));
 
       // making sure push to blob are not disrupted by another push to blob
-      assertFalse("Blob push breached by another blob push,  currentStage=" + 
p, activeBlobPushers > 1);
+      assertFalse("Blob push breached by another blob push, coreName=" + 
coreName + " currentStage=" + s, activePushers > 1);
+    }
+  }
 
-      prevThreadId = currentThreadId;
-      prevStage = currentStage;
+  /**
+   * Adds a new replica.
+   */
+  private Replica addReplica() throws Exception {
+    List<String> existingReplicas = 
getCollection().getSlice(SHARD_NAME).getReplicas().stream().map(r -> 
r.getName()).collect(Collectors.toList());
+    // add another replica
+    assertTrue(CollectionAdminRequest.addReplicaToShard(COLLECTION_NAME, 
SHARD_NAME, Replica.Type.SHARED)
+        .process(cluster.getSolrClient()).isSuccess());
+    // Verify that new replica is created
+    waitForState("Timed-out waiting for new replica to be created", 
COLLECTION_NAME, clusterShape(1, existingReplicas.size() + 1));
+
+    Replica newReplica = null;
+
+    for (Replica r : getCollection().getSlice(SHARD_NAME).getReplicas()) {
+      if (!existingReplicas.contains(r.getName())) {
+        newReplica = r;
+        break;
+      }
     }
+
+    assertNotNull("Could not find new replica", newReplica);
+
+    return newReplica;
   }
 
-  private void configureTestSharedConcurrencyControllerForNode(JettySolrRunner 
runner, List<String> progress) {
-    SharedCoreConcurrencyController concurrencyController = new 
SharedCoreConcurrencyController(runner.getCoreContainer()) {
-      Object recorderLock = new Object();
-      @Override
-      public void recordState(String collectionName, String shardName, String 
coreName, SharedCoreStage stage) {
-        super.recordState(collectionName, shardName, coreName, stage);
+  /**
+   * Directly query the {@code replica} and verify that all the docs 
indexed(after accounting for deletions) are found.
 
 Review comment:
   Might be worth adding a note that this is testing an empty replica hydrating 
after the concurrent tests 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to