andyvuong commented on a change in pull request #1081: SOLR-13101: Concurrency 
tests for SHARED collection.
URL: https://github.com/apache/lucene-solr/pull/1081#discussion_r358401798
 
 

 ##########
 File path: 
solr/core/src/test/org/apache/solr/store/shared/SharedCoreConcurrencyTest.java
 ##########
 @@ -49,411 +58,673 @@
 /**
  * Tests around synchronization of concurrent indexing, pushes and pulls
  * happening on a core of a shared collection {@link 
DocCollection#getSharedIndex()}
- * todo: add tests for failover scenarios and involve query pulls 
  */
 public class SharedCoreConcurrencyTest extends SolrCloudSharedStoreTestCase {
 
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
+  private static final String COLLECTION_NAME = "sharedCollection";
+  private static final String SHARD_NAME = "shard1";
+  /**
+   * Number of serial indexing iterations for each test. This is the main 
setting, queries and failover iterations
+   * stop after indexing ends. Higher the value, longer the tests would run.
+   */
+  private static int INDEXING_ITERATIONS = TEST_NIGHTLY ? 100 : 20;
+  /**
+   * Maximum number of concurrent indexing requests per indexing iteration.
+   */
+  private static int MAX_NUM_OF_CONCURRENT_INDEXING_REQUESTS_PER_ITERATION = 
10;
+  /**
+   * Maximum number of docs per indexing request.
+   */
+  private static int MAX_NUM_OF_DOCS_PER_INDEXING_REQUEST = 100;
+  /**
+   * Indexing can fail because of leader failures (especially when test {@link 
#includeFailovers()}).
+   * The test will re-attempt up till this number of times before bailing out. 
For test to succeed,
+   * indexing request have to succeed in these many attempts.
+   */
+  private static int MAX_NUM_OF_ATTEMPTS_PER_INDEXING_REQUEST = 10;
+  /**
+   * Maximum number of concurrent query requests per query iteration.
+   */
+  private static int MAX_NUM_OF_CONCURRENT_QUERY_REQUESTS_PER_ITERATION = 10;
+  /**
+   * Indexing is faster than indexing, to pace it better with indexing, a 
delay is added between each query iteration.
+   */
+  private static int DELAY_MS_BETWEEN_EACH_QUERY_ITERATION = 50;
+  /**
+   * Minimum time between each failover.
+   */
+  private static int DELAY_MS_BETWEEN_EACH_FAILOVER_ITERATION = 500;
+  /**
+   * Path for local shared store
+   */
   private static Path sharedStoreRootPath;
 
+  /**
+   * Manages test state from start to end.
+   */
+  private TestState testState;
+
   @BeforeClass
   public static void setupClass() throws Exception {
     sharedStoreRootPath = createTempDir("tempDir");
   }
 
+  @Before
+  public void setupTest() throws Exception {
+    int numNodes = 4;
+    setupCluster(numNodes);
+    testState = new TestState();
+    setupSolrNodesForTest();
+
+    int maxShardsPerNode = 1;
+    // One less than number of nodes.
+    // The extra node will be used at the end of test to verify 
+    // the contents of shared store by querying for all docs on a new replica.
+    int numReplicas = numNodes - 1;
+    // Later on we can consider choosing random number of shards and replicas.
+    // To handle multiple shards, we need to update code where SHARD_NAME is 
used.
+    setupSharedCollectionWithShardNames(COLLECTION_NAME, maxShardsPerNode, 
numReplicas, SHARD_NAME);
+  }
+
+  @After
+  public void teardownTest() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    FileUtils.cleanDirectory(sharedStoreRootPath.toFile());
+  }
+
   /**
-   * Tests issuing random number of concurrent indexing requests in a given 
range for a shared core and making sure they all succeed.
+   * Tests that concurrent indexing succeed.
    */
   @Test
-  public void testHighConcurrentIndexing() throws Exception {
-    int maxIndexingThreads = 100;
-    int maxDocsPerThread = 100;
-    testConcurrentIndexing(maxIndexingThreads, maxDocsPerThread);
+  public void testIndexing() throws Exception {
+    final boolean includeDeletes = false;
+    includeIndexing(includeDeletes);
+    run();
   }
 
   /**
-   * Test ensuring two indexing requests interleave in desired ways and 
succeed.
+   * Tests that concurrent indexing with concurrent queries succeed.
    */
-  //  @Test todo: leaking threads, test only issue
-  public void TODOtestPossibleInterleaving() throws Exception {
-
-    // completely serialized
-    testConcurrentIndexing(
-        new IndexingThreadInterleaver(
-            SharedCoreStage.BlobPushFinished,
-            SharedCoreStage.LocalIndexingStarted,
-            true));
-
-    // second pushes for first one too
-    testConcurrentIndexing(
-        new IndexingThreadInterleaver(
-            SharedCoreStage.LocalIndexingFinished, 
-            SharedCoreStage.BlobPushFinished,
-            true));
+  @Test
+  public void testIndexingQueries() throws Exception {
+    final boolean includeDeletes = false;
+    includeIndexing(includeDeletes);
+    includeQueries();
+    run();
   }
 
   /**
-   * Test ensuring two indexing requests do not interleave in impossible ways 
even when forced and still succeed.
+   * Tests that concurrent indexing with deletes and concurrent queries 
succeed.
    */
-  //  @Test todo: leaking threads, test only issue
-  public void TODOtestImpossibleInterleaving() throws Exception {
-    // push critical section
-    testConcurrentIndexing(
-        new IndexingThreadInterleaver(
-            SharedCoreStage.ZkUpdateFinished,
-            SharedCoreStage.ZkUpdateFinished,
-            false));
-
-    // another push critical section
-    testConcurrentIndexing(
-        new IndexingThreadInterleaver(
-            SharedCoreStage.ZkUpdateFinished,
-            SharedCoreStage.LocalCacheUpdateFinished,
-            false));
+  @Test
+  public void testIndexingQueriesDeletes() throws Exception {
+    final boolean includeDeletes = true;
+    includeIndexing(includeDeletes);
+    includeQueries();
+    run();
   }
 
-  private void testConcurrentIndexing(int maxIndexingThreads, int 
maxDocsPerThread) throws Exception {
-    int numIndexingThreads = new Random().nextInt(maxIndexingThreads) + 1;;
-    testConcurrentIndexing(numIndexingThreads, maxDocsPerThread, null);
+  /**
+   * Tests that concurrent indexing with deletes, concurrent queries and 
explicit failovers succeed.
+   */
+  // @Test 
+  // TODO: This test flaps time to time. The symptom of the failure is missing 
docs i.e. indexing is declared successful
+  //       but query could not reproduce all of the docs. I was able to repro 
this with NRT collection on vanilla 8.3 too. 
+  //       I have not root caused it yet. Keeping this test disabled, until 
the problem is root caused and fixed. 
+  public void todoTestIndexingQueriesDeletesFailovers() throws Exception { 
+    final boolean includeDeletes = true;
+    includeIndexing(includeDeletes);
+    includeQueries();
+    includeFailovers();
+    run();
   }
 
-  private void testConcurrentIndexing(IndexingThreadInterleaver interleaver) 
throws Exception {
-    testConcurrentIndexing(2, 10, interleaver);
+  /**
+   * It starts all the threads that are included in the test (indexing, 
queries and failovers) in parallel.
+   * Then wait for them to finish (run length depends on {@link 
#INDEXING_ITERATIONS}).
+   * At the end it makes sures that no critical section was breached and no 
unexpected error occurred.
+   * Then verify the contents of shared store by querying for all docs on a 
new replica.
+   */
+  private void run() throws Exception {
+    testState.startIncludedThreads();
+    testState.waitForThreadsToStop();
+    analyzeCoreConcurrencyStagesForBreaches();
+    testState.checkErrors();
+    Replica newReplica = addReplica();
+    queryNewReplicaAndVerifyAllDocsFound(newReplica);
   }
 
   /**
-   * Start desired number of concurrent indexing threads with each indexing 
random number of docs between 1 and maxDocsPerThread.
+   * Adds a thread to test, that goes over {@link #INDEXING_ITERATIONS} or 
until it is interrupted.
+   * In each iteration it creates between 1 and {@link 
#MAX_NUM_OF_CONCURRENT_INDEXING_REQUESTS_PER_ITERATION} threads
+   * by calling {@link #createIndexingThreads(int, int, boolean)}, starts them 
concurrently and wait for them to finish
+   * before going to next iteration. Each indexing thread adds between 1 and  
{@link #MAX_NUM_OF_DOCS_PER_INDEXING_REQUEST}
+   * docs.
    *
-   * At the end it verifies everything got indexed successfully on leader. No 
critical section got breached.
-   * Also verify the integrity of shared store contents by pulling them on a 
follower replica.
+   * @param includeDeletes whether to randomly mark some docs for deletion and 
delete them in subsequent indexing requests
+   *                       or not
    */
-  private void testConcurrentIndexing(int numIndexingThreads, int 
maxDocsPerThread, IndexingThreadInterleaver interleaver) throws Exception {
-    setupCluster(2);
-    CloudSolrClient cloudClient = cluster.getSolrClient();
-
-    // this map tracks the async pull queues per solr process
-    Map<String, Map<String, CountDownLatch>> solrProcessesTaskTracker = new 
HashMap<>();
-
-    JettySolrRunner solrProcess1 = cluster.getJettySolrRunner(0);
-    CoreStorageClient storageClient1 = 
setupLocalBlobStoreClient(sharedStoreRootPath, DEFAULT_BLOB_DIR_NAME);
-    
setupTestSharedClientForNode(getBlobStorageProviderTestInstance(storageClient1),
 solrProcess1);
-    Map<String, CountDownLatch> asyncPullLatches1 = 
configureTestBlobProcessForNode(solrProcess1);
-
-    JettySolrRunner solrProcess2 = cluster.getJettySolrRunner(1);
-    CoreStorageClient storageClient2 = 
setupLocalBlobStoreClient(sharedStoreRootPath, DEFAULT_BLOB_DIR_NAME);
-    
setupTestSharedClientForNode(getBlobStorageProviderTestInstance(storageClient2),
 solrProcess2);
-    Map<String, CountDownLatch> asyncPullLatches2 = 
configureTestBlobProcessForNode(solrProcess2);
-
-    solrProcessesTaskTracker.put(solrProcess1.getNodeName(), 
asyncPullLatches1);
-    solrProcessesTaskTracker.put(solrProcess2.getNodeName(), 
asyncPullLatches2);
-
-    String collectionName = "sharedCollection";
-    int maxShardsPerNode = 1;
-    int numReplicas = 2;
-    // specify a comma-delimited string of shard names for multiple shards 
when using
-    // an implicit router
-    String shardNames = "shard1";
-    setupSharedCollectionWithShardNames(collectionName, maxShardsPerNode, 
numReplicas, shardNames);
-
-    // get the leader replica and follower replicas
-    DocCollection collection = 
cloudClient.getZkStateReader().getClusterState().getCollection(collectionName);
-    Replica shardLeaderReplica = collection.getLeader("shard1");
-    Replica followerReplica = null;
-    for (Replica repl : collection.getSlice("shard1").getReplicas()) {
-      if (repl.getName() != shardLeaderReplica.getName()) {
-        followerReplica = repl;
-        break;
+  private void includeIndexing(boolean includeDeletes) {
+    Thread t = new Thread(() -> {
+      try {
+        for (int i = 0; i < INDEXING_ITERATIONS && 
!testState.stopRunning.get(); i++) {
+          int numIndexingThreads = 
random().nextInt(MAX_NUM_OF_CONCURRENT_INDEXING_REQUESTS_PER_ITERATION) + 1;
+          int numDocsToAddPerThread = 
random().nextInt(MAX_NUM_OF_DOCS_PER_INDEXING_REQUEST) + 1;
+          Thread[] indexingThreads = createIndexingThreads(numIndexingThreads, 
numDocsToAddPerThread, includeDeletes);
+          for (int j = 0; j < numIndexingThreads; j++) {
+            indexingThreads[j].start();
+          }
+          for (int j = 0; j < numIndexingThreads; j++) {
+            indexingThreads[j].join();
+          }
+          if (Thread.interrupted()) {
+            // we have been interrupted so we will stop running
+            testState.stopRunning.set(true);
+          }
+        }
+      } catch (Exception ex) {
+        testState.indexingErrors.add(ex.getMessage());
       }
-    }
+      // everything else stops running when indexing finishes
+      testState.stopRunning.set(true);
+    });
+    testState.includeThread(t);
+  }
 
-    
-   JettySolrRunner shardLeaderSolrRunner = null;
-   for (JettySolrRunner solrRunner : cluster.getJettySolrRunners()) {
-     if(solrRunner.getNodeName().equals(shardLeaderReplica.getNodeName())){
-       shardLeaderSolrRunner = solrRunner;
-       break;
-     }
-   }
-   List<String> progress = new ArrayList<>();
-
-   if(interleaver != null) {
-     configureTestSharedConcurrencyControllerForNode(shardLeaderSolrRunner, 
progress, interleaver);
-   } else {
-     configureTestSharedConcurrencyControllerForNode(shardLeaderSolrRunner, 
progress);
-   }
-
-    AtomicInteger totalDocs= new AtomicInteger(0);
+  /**
+   * Creates {@code numIndexingThreads} threads with each adding {@code 
numDocsToAddPerThread}.
+   *
+   * @param includeDeletes whether to randomly mark some docs for deletion and 
delete them in subsequent indexing requests
+   *                       or not
+   */
+  private Thread[] createIndexingThreads(int numIndexingThreads, int 
numDocsToAddPerThread, boolean includeDeletes) throws Exception {
     log.info("numIndexingThreads=" + numIndexingThreads);
     Thread[] indexingThreads = new Thread[numIndexingThreads];
-    ConcurrentLinkedQueue<String> indexingErrors = new 
ConcurrentLinkedQueue<>();
-    for (int i = 0; i < numIndexingThreads; i++) {
+    for (int i = 0; i < numIndexingThreads && !testState.stopRunning.get(); 
i++) {
       indexingThreads[i] = new Thread(() -> {
-        try {
-          // index between 1 to maxDocsPerThread docs
-          int numDocs = new Random().nextInt(maxDocsPerThread) + 1;
-          log.info("numDocs=" + numDocs);
-          UpdateRequest updateReq = new UpdateRequest();
-          for (int j = 0; j < numDocs; j++) {
-            int docId = totalDocs.incrementAndGet();
-            updateReq.add("id", Integer.toString(docId));
+        List<String> idsToAdd = new ArrayList<>();
+        // prepare the list of docs to add and delete outside the reattempt 
loop
+        for (int j = 0; j < numDocsToAddPerThread; j++) {
+          String docId = 
Integer.toString(testState.docIdGenerator.incrementAndGet());
+          idsToAdd.add(docId);
+        }
+        List<String> idsToDelete = testState.idBatchesToDelete.poll();
+
+        // attempt until succeeded or max attempts 
+        for (int j = 0; j < MAX_NUM_OF_ATTEMPTS_PER_INDEXING_REQUEST; j++) {
+          try {
+            String message = "attempt=" + (j + 1) + " numDocsToAdd=" + 
numDocsToAddPerThread + " docsToAdd=" + idsToAdd.toString();
+            if (idsToDelete != null) {
+              message += " numDocsToDelete=" + idsToDelete.size() + " 
docsToDelete=" + idsToDelete.toString();
+            }
+            log.info(message);
+
+            UpdateRequest updateReq = new UpdateRequest();
+            for (int k = 0; k < idsToAdd.size(); k++) {
+              updateReq.add("id", idsToAdd.get(k));
+            }
+            if (includeDeletes && idsToDelete != null) {
+              updateReq.deleteById(idsToDelete);
+            }
+            processUpdateRequest(updateReq);
+
+            testState.numDocsIndexed.addAndGet(numDocsToAddPerThread);
+            if (idsToDelete != null) {
+              testState.idsDeleted.addAll(idsToDelete);
+            }
+
+            // randomly select some docs that can be deleted
+            if (includeDeletes) {
+              List<String> idsThatCanBeDeleted = new ArrayList<>();
+              for (String indexedId : idsToAdd) {
+                if (random().nextBoolean()) {
+                  idsThatCanBeDeleted.add(indexedId);
+                }
+              }
+              if (!idsThatCanBeDeleted.isEmpty()) {
+                testState.idBatchesToDelete.offer(idsThatCanBeDeleted);
+              }
+            }
+            // indexing was successful, stop attempting
+            break;
+          } catch (Exception ex) {
+            // last attempt also failed, record the error
+            if (j == MAX_NUM_OF_ATTEMPTS_PER_INDEXING_REQUEST - 1) {
+              
testState.indexingErrors.add(Throwables.getStackTraceAsString(ex));
+            }
           }
-          updateReq.commit(cloudClient, collectionName);
-        } catch (Exception ex) {
-          indexingErrors.add(ex.getMessage());
         }
       });
     }
+    return indexingThreads;
+  }
 
-    for (int i = 0; i < numIndexingThreads; i++) {
-      indexingThreads[i].start();
-    }
+  /**
+   * Sends update request to the server, randomly choosing whether to send it 
with commit=true or not
+   */
+  private void processUpdateRequest(UpdateRequest request) throws Exception {
+    UpdateResponse response = random().nextBoolean()
 
 Review comment:
   Just wondering the reason for randomizing this? We implicitly hard commit 
regardless so it seems the net effect is an additional commit req at the end of 
the client indexing flow

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to