andyvuong commented on a change in pull request #1081: SOLR-13101: Concurrency tests for SHARED collection. URL: https://github.com/apache/lucene-solr/pull/1081#discussion_r358401798
########## File path: solr/core/src/test/org/apache/solr/store/shared/SharedCoreConcurrencyTest.java ########## @@ -49,411 +58,673 @@ /** * Tests around synchronization of concurrent indexing, pushes and pulls * happening on a core of a shared collection {@link DocCollection#getSharedIndex()} - * todo: add tests for failover scenarios and involve query pulls */ public class SharedCoreConcurrencyTest extends SolrCloudSharedStoreTestCase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - + private static final String COLLECTION_NAME = "sharedCollection"; + private static final String SHARD_NAME = "shard1"; + /** + * Number of serial indexing iterations for each test. This is the main setting, queries and failover iterations + * stop after indexing ends. Higher the value, longer the tests would run. + */ + private static int INDEXING_ITERATIONS = TEST_NIGHTLY ? 100 : 20; + /** + * Maximum number of concurrent indexing requests per indexing iteration. + */ + private static int MAX_NUM_OF_CONCURRENT_INDEXING_REQUESTS_PER_ITERATION = 10; + /** + * Maximum number of docs per indexing request. + */ + private static int MAX_NUM_OF_DOCS_PER_INDEXING_REQUEST = 100; + /** + * Indexing can fail because of leader failures (especially when test {@link #includeFailovers()}). + * The test will re-attempt up till this number of times before bailing out. For test to succeed, + * indexing request have to succeed in these many attempts. + */ + private static int MAX_NUM_OF_ATTEMPTS_PER_INDEXING_REQUEST = 10; + /** + * Maximum number of concurrent query requests per query iteration. + */ + private static int MAX_NUM_OF_CONCURRENT_QUERY_REQUESTS_PER_ITERATION = 10; + /** + * Indexing is faster than indexing, to pace it better with indexing, a delay is added between each query iteration. + */ + private static int DELAY_MS_BETWEEN_EACH_QUERY_ITERATION = 50; + /** + * Minimum time between each failover. + */ + private static int DELAY_MS_BETWEEN_EACH_FAILOVER_ITERATION = 500; + /** + * Path for local shared store + */ private static Path sharedStoreRootPath; + /** + * Manages test state from start to end. + */ + private TestState testState; + @BeforeClass public static void setupClass() throws Exception { sharedStoreRootPath = createTempDir("tempDir"); } + @Before + public void setupTest() throws Exception { + int numNodes = 4; + setupCluster(numNodes); + testState = new TestState(); + setupSolrNodesForTest(); + + int maxShardsPerNode = 1; + // One less than number of nodes. + // The extra node will be used at the end of test to verify + // the contents of shared store by querying for all docs on a new replica. + int numReplicas = numNodes - 1; + // Later on we can consider choosing random number of shards and replicas. + // To handle multiple shards, we need to update code where SHARD_NAME is used. + setupSharedCollectionWithShardNames(COLLECTION_NAME, maxShardsPerNode, numReplicas, SHARD_NAME); + } + + @After + public void teardownTest() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + FileUtils.cleanDirectory(sharedStoreRootPath.toFile()); + } + /** - * Tests issuing random number of concurrent indexing requests in a given range for a shared core and making sure they all succeed. + * Tests that concurrent indexing succeed. */ @Test - public void testHighConcurrentIndexing() throws Exception { - int maxIndexingThreads = 100; - int maxDocsPerThread = 100; - testConcurrentIndexing(maxIndexingThreads, maxDocsPerThread); + public void testIndexing() throws Exception { + final boolean includeDeletes = false; + includeIndexing(includeDeletes); + run(); } /** - * Test ensuring two indexing requests interleave in desired ways and succeed. + * Tests that concurrent indexing with concurrent queries succeed. */ - // @Test todo: leaking threads, test only issue - public void TODOtestPossibleInterleaving() throws Exception { - - // completely serialized - testConcurrentIndexing( - new IndexingThreadInterleaver( - SharedCoreStage.BlobPushFinished, - SharedCoreStage.LocalIndexingStarted, - true)); - - // second pushes for first one too - testConcurrentIndexing( - new IndexingThreadInterleaver( - SharedCoreStage.LocalIndexingFinished, - SharedCoreStage.BlobPushFinished, - true)); + @Test + public void testIndexingQueries() throws Exception { + final boolean includeDeletes = false; + includeIndexing(includeDeletes); + includeQueries(); + run(); } /** - * Test ensuring two indexing requests do not interleave in impossible ways even when forced and still succeed. + * Tests that concurrent indexing with deletes and concurrent queries succeed. */ - // @Test todo: leaking threads, test only issue - public void TODOtestImpossibleInterleaving() throws Exception { - // push critical section - testConcurrentIndexing( - new IndexingThreadInterleaver( - SharedCoreStage.ZkUpdateFinished, - SharedCoreStage.ZkUpdateFinished, - false)); - - // another push critical section - testConcurrentIndexing( - new IndexingThreadInterleaver( - SharedCoreStage.ZkUpdateFinished, - SharedCoreStage.LocalCacheUpdateFinished, - false)); + @Test + public void testIndexingQueriesDeletes() throws Exception { + final boolean includeDeletes = true; + includeIndexing(includeDeletes); + includeQueries(); + run(); } - private void testConcurrentIndexing(int maxIndexingThreads, int maxDocsPerThread) throws Exception { - int numIndexingThreads = new Random().nextInt(maxIndexingThreads) + 1;; - testConcurrentIndexing(numIndexingThreads, maxDocsPerThread, null); + /** + * Tests that concurrent indexing with deletes, concurrent queries and explicit failovers succeed. + */ + // @Test + // TODO: This test flaps time to time. The symptom of the failure is missing docs i.e. indexing is declared successful + // but query could not reproduce all of the docs. I was able to repro this with NRT collection on vanilla 8.3 too. + // I have not root caused it yet. Keeping this test disabled, until the problem is root caused and fixed. + public void todoTestIndexingQueriesDeletesFailovers() throws Exception { + final boolean includeDeletes = true; + includeIndexing(includeDeletes); + includeQueries(); + includeFailovers(); + run(); } - private void testConcurrentIndexing(IndexingThreadInterleaver interleaver) throws Exception { - testConcurrentIndexing(2, 10, interleaver); + /** + * It starts all the threads that are included in the test (indexing, queries and failovers) in parallel. + * Then wait for them to finish (run length depends on {@link #INDEXING_ITERATIONS}). + * At the end it makes sures that no critical section was breached and no unexpected error occurred. + * Then verify the contents of shared store by querying for all docs on a new replica. + */ + private void run() throws Exception { + testState.startIncludedThreads(); + testState.waitForThreadsToStop(); + analyzeCoreConcurrencyStagesForBreaches(); + testState.checkErrors(); + Replica newReplica = addReplica(); + queryNewReplicaAndVerifyAllDocsFound(newReplica); } /** - * Start desired number of concurrent indexing threads with each indexing random number of docs between 1 and maxDocsPerThread. + * Adds a thread to test, that goes over {@link #INDEXING_ITERATIONS} or until it is interrupted. + * In each iteration it creates between 1 and {@link #MAX_NUM_OF_CONCURRENT_INDEXING_REQUESTS_PER_ITERATION} threads + * by calling {@link #createIndexingThreads(int, int, boolean)}, starts them concurrently and wait for them to finish + * before going to next iteration. Each indexing thread adds between 1 and {@link #MAX_NUM_OF_DOCS_PER_INDEXING_REQUEST} + * docs. * - * At the end it verifies everything got indexed successfully on leader. No critical section got breached. - * Also verify the integrity of shared store contents by pulling them on a follower replica. + * @param includeDeletes whether to randomly mark some docs for deletion and delete them in subsequent indexing requests + * or not */ - private void testConcurrentIndexing(int numIndexingThreads, int maxDocsPerThread, IndexingThreadInterleaver interleaver) throws Exception { - setupCluster(2); - CloudSolrClient cloudClient = cluster.getSolrClient(); - - // this map tracks the async pull queues per solr process - Map<String, Map<String, CountDownLatch>> solrProcessesTaskTracker = new HashMap<>(); - - JettySolrRunner solrProcess1 = cluster.getJettySolrRunner(0); - CoreStorageClient storageClient1 = setupLocalBlobStoreClient(sharedStoreRootPath, DEFAULT_BLOB_DIR_NAME); - setupTestSharedClientForNode(getBlobStorageProviderTestInstance(storageClient1), solrProcess1); - Map<String, CountDownLatch> asyncPullLatches1 = configureTestBlobProcessForNode(solrProcess1); - - JettySolrRunner solrProcess2 = cluster.getJettySolrRunner(1); - CoreStorageClient storageClient2 = setupLocalBlobStoreClient(sharedStoreRootPath, DEFAULT_BLOB_DIR_NAME); - setupTestSharedClientForNode(getBlobStorageProviderTestInstance(storageClient2), solrProcess2); - Map<String, CountDownLatch> asyncPullLatches2 = configureTestBlobProcessForNode(solrProcess2); - - solrProcessesTaskTracker.put(solrProcess1.getNodeName(), asyncPullLatches1); - solrProcessesTaskTracker.put(solrProcess2.getNodeName(), asyncPullLatches2); - - String collectionName = "sharedCollection"; - int maxShardsPerNode = 1; - int numReplicas = 2; - // specify a comma-delimited string of shard names for multiple shards when using - // an implicit router - String shardNames = "shard1"; - setupSharedCollectionWithShardNames(collectionName, maxShardsPerNode, numReplicas, shardNames); - - // get the leader replica and follower replicas - DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName); - Replica shardLeaderReplica = collection.getLeader("shard1"); - Replica followerReplica = null; - for (Replica repl : collection.getSlice("shard1").getReplicas()) { - if (repl.getName() != shardLeaderReplica.getName()) { - followerReplica = repl; - break; + private void includeIndexing(boolean includeDeletes) { + Thread t = new Thread(() -> { + try { + for (int i = 0; i < INDEXING_ITERATIONS && !testState.stopRunning.get(); i++) { + int numIndexingThreads = random().nextInt(MAX_NUM_OF_CONCURRENT_INDEXING_REQUESTS_PER_ITERATION) + 1; + int numDocsToAddPerThread = random().nextInt(MAX_NUM_OF_DOCS_PER_INDEXING_REQUEST) + 1; + Thread[] indexingThreads = createIndexingThreads(numIndexingThreads, numDocsToAddPerThread, includeDeletes); + for (int j = 0; j < numIndexingThreads; j++) { + indexingThreads[j].start(); + } + for (int j = 0; j < numIndexingThreads; j++) { + indexingThreads[j].join(); + } + if (Thread.interrupted()) { + // we have been interrupted so we will stop running + testState.stopRunning.set(true); + } + } + } catch (Exception ex) { + testState.indexingErrors.add(ex.getMessage()); } - } + // everything else stops running when indexing finishes + testState.stopRunning.set(true); + }); + testState.includeThread(t); + } - - JettySolrRunner shardLeaderSolrRunner = null; - for (JettySolrRunner solrRunner : cluster.getJettySolrRunners()) { - if(solrRunner.getNodeName().equals(shardLeaderReplica.getNodeName())){ - shardLeaderSolrRunner = solrRunner; - break; - } - } - List<String> progress = new ArrayList<>(); - - if(interleaver != null) { - configureTestSharedConcurrencyControllerForNode(shardLeaderSolrRunner, progress, interleaver); - } else { - configureTestSharedConcurrencyControllerForNode(shardLeaderSolrRunner, progress); - } - - AtomicInteger totalDocs= new AtomicInteger(0); + /** + * Creates {@code numIndexingThreads} threads with each adding {@code numDocsToAddPerThread}. + * + * @param includeDeletes whether to randomly mark some docs for deletion and delete them in subsequent indexing requests + * or not + */ + private Thread[] createIndexingThreads(int numIndexingThreads, int numDocsToAddPerThread, boolean includeDeletes) throws Exception { log.info("numIndexingThreads=" + numIndexingThreads); Thread[] indexingThreads = new Thread[numIndexingThreads]; - ConcurrentLinkedQueue<String> indexingErrors = new ConcurrentLinkedQueue<>(); - for (int i = 0; i < numIndexingThreads; i++) { + for (int i = 0; i < numIndexingThreads && !testState.stopRunning.get(); i++) { indexingThreads[i] = new Thread(() -> { - try { - // index between 1 to maxDocsPerThread docs - int numDocs = new Random().nextInt(maxDocsPerThread) + 1; - log.info("numDocs=" + numDocs); - UpdateRequest updateReq = new UpdateRequest(); - for (int j = 0; j < numDocs; j++) { - int docId = totalDocs.incrementAndGet(); - updateReq.add("id", Integer.toString(docId)); + List<String> idsToAdd = new ArrayList<>(); + // prepare the list of docs to add and delete outside the reattempt loop + for (int j = 0; j < numDocsToAddPerThread; j++) { + String docId = Integer.toString(testState.docIdGenerator.incrementAndGet()); + idsToAdd.add(docId); + } + List<String> idsToDelete = testState.idBatchesToDelete.poll(); + + // attempt until succeeded or max attempts + for (int j = 0; j < MAX_NUM_OF_ATTEMPTS_PER_INDEXING_REQUEST; j++) { + try { + String message = "attempt=" + (j + 1) + " numDocsToAdd=" + numDocsToAddPerThread + " docsToAdd=" + idsToAdd.toString(); + if (idsToDelete != null) { + message += " numDocsToDelete=" + idsToDelete.size() + " docsToDelete=" + idsToDelete.toString(); + } + log.info(message); + + UpdateRequest updateReq = new UpdateRequest(); + for (int k = 0; k < idsToAdd.size(); k++) { + updateReq.add("id", idsToAdd.get(k)); + } + if (includeDeletes && idsToDelete != null) { + updateReq.deleteById(idsToDelete); + } + processUpdateRequest(updateReq); + + testState.numDocsIndexed.addAndGet(numDocsToAddPerThread); + if (idsToDelete != null) { + testState.idsDeleted.addAll(idsToDelete); + } + + // randomly select some docs that can be deleted + if (includeDeletes) { + List<String> idsThatCanBeDeleted = new ArrayList<>(); + for (String indexedId : idsToAdd) { + if (random().nextBoolean()) { + idsThatCanBeDeleted.add(indexedId); + } + } + if (!idsThatCanBeDeleted.isEmpty()) { + testState.idBatchesToDelete.offer(idsThatCanBeDeleted); + } + } + // indexing was successful, stop attempting + break; + } catch (Exception ex) { + // last attempt also failed, record the error + if (j == MAX_NUM_OF_ATTEMPTS_PER_INDEXING_REQUEST - 1) { + testState.indexingErrors.add(Throwables.getStackTraceAsString(ex)); + } } - updateReq.commit(cloudClient, collectionName); - } catch (Exception ex) { - indexingErrors.add(ex.getMessage()); } }); } + return indexingThreads; + } - for (int i = 0; i < numIndexingThreads; i++) { - indexingThreads[i].start(); - } + /** + * Sends update request to the server, randomly choosing whether to send it with commit=true or not + */ + private void processUpdateRequest(UpdateRequest request) throws Exception { + UpdateResponse response = random().nextBoolean() Review comment: Just wondering the reason for randomizing this? We implicitly hard commit regardless so it seems the net effect is an additional commit req at the end of the client indexing flow ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org