andyvuong commented on a change in pull request #1081: SOLR-13101: Concurrency tests for SHARED collection. URL: https://github.com/apache/lucene-solr/pull/1081#discussion_r358429461
########## File path: solr/core/src/test/org/apache/solr/store/shared/SharedCoreConcurrencyTest.java ########## @@ -49,411 +58,673 @@ /** * Tests around synchronization of concurrent indexing, pushes and pulls * happening on a core of a shared collection {@link DocCollection#getSharedIndex()} - * todo: add tests for failover scenarios and involve query pulls */ public class SharedCoreConcurrencyTest extends SolrCloudSharedStoreTestCase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - + private static final String COLLECTION_NAME = "sharedCollection"; + private static final String SHARD_NAME = "shard1"; + /** + * Number of serial indexing iterations for each test. This is the main setting, queries and failover iterations + * stop after indexing ends. Higher the value, longer the tests would run. + */ + private static int INDEXING_ITERATIONS = TEST_NIGHTLY ? 100 : 20; + /** + * Maximum number of concurrent indexing requests per indexing iteration. + */ + private static int MAX_NUM_OF_CONCURRENT_INDEXING_REQUESTS_PER_ITERATION = 10; + /** + * Maximum number of docs per indexing request. + */ + private static int MAX_NUM_OF_DOCS_PER_INDEXING_REQUEST = 100; + /** + * Indexing can fail because of leader failures (especially when test {@link #includeFailovers()}). + * The test will re-attempt up till this number of times before bailing out. For test to succeed, + * indexing request have to succeed in these many attempts. + */ + private static int MAX_NUM_OF_ATTEMPTS_PER_INDEXING_REQUEST = 10; + /** + * Maximum number of concurrent query requests per query iteration. + */ + private static int MAX_NUM_OF_CONCURRENT_QUERY_REQUESTS_PER_ITERATION = 10; + /** + * Indexing is faster than indexing, to pace it better with indexing, a delay is added between each query iteration. + */ + private static int DELAY_MS_BETWEEN_EACH_QUERY_ITERATION = 50; + /** + * Minimum time between each failover. + */ + private static int DELAY_MS_BETWEEN_EACH_FAILOVER_ITERATION = 500; + /** + * Path for local shared store + */ private static Path sharedStoreRootPath; + /** + * Manages test state from start to end. + */ + private TestState testState; + @BeforeClass public static void setupClass() throws Exception { sharedStoreRootPath = createTempDir("tempDir"); } + @Before + public void setupTest() throws Exception { + int numNodes = 4; + setupCluster(numNodes); + testState = new TestState(); + setupSolrNodesForTest(); + + int maxShardsPerNode = 1; + // One less than number of nodes. + // The extra node will be used at the end of test to verify + // the contents of shared store by querying for all docs on a new replica. + int numReplicas = numNodes - 1; + // Later on we can consider choosing random number of shards and replicas. + // To handle multiple shards, we need to update code where SHARD_NAME is used. + setupSharedCollectionWithShardNames(COLLECTION_NAME, maxShardsPerNode, numReplicas, SHARD_NAME); + } + + @After + public void teardownTest() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + FileUtils.cleanDirectory(sharedStoreRootPath.toFile()); + } + /** - * Tests issuing random number of concurrent indexing requests in a given range for a shared core and making sure they all succeed. + * Tests that concurrent indexing succeed. */ @Test - public void testHighConcurrentIndexing() throws Exception { - int maxIndexingThreads = 100; - int maxDocsPerThread = 100; - testConcurrentIndexing(maxIndexingThreads, maxDocsPerThread); + public void testIndexing() throws Exception { + final boolean includeDeletes = false; + includeIndexing(includeDeletes); + run(); } /** - * Test ensuring two indexing requests interleave in desired ways and succeed. + * Tests that concurrent indexing with concurrent queries succeed. */ - // @Test todo: leaking threads, test only issue - public void TODOtestPossibleInterleaving() throws Exception { - - // completely serialized - testConcurrentIndexing( - new IndexingThreadInterleaver( - SharedCoreStage.BlobPushFinished, - SharedCoreStage.LocalIndexingStarted, - true)); - - // second pushes for first one too - testConcurrentIndexing( - new IndexingThreadInterleaver( - SharedCoreStage.LocalIndexingFinished, - SharedCoreStage.BlobPushFinished, - true)); + @Test + public void testIndexingQueries() throws Exception { + final boolean includeDeletes = false; + includeIndexing(includeDeletes); + includeQueries(); + run(); } /** - * Test ensuring two indexing requests do not interleave in impossible ways even when forced and still succeed. + * Tests that concurrent indexing with deletes and concurrent queries succeed. */ - // @Test todo: leaking threads, test only issue - public void TODOtestImpossibleInterleaving() throws Exception { - // push critical section - testConcurrentIndexing( - new IndexingThreadInterleaver( - SharedCoreStage.ZkUpdateFinished, - SharedCoreStage.ZkUpdateFinished, - false)); - - // another push critical section - testConcurrentIndexing( - new IndexingThreadInterleaver( - SharedCoreStage.ZkUpdateFinished, - SharedCoreStage.LocalCacheUpdateFinished, - false)); + @Test + public void testIndexingQueriesDeletes() throws Exception { + final boolean includeDeletes = true; + includeIndexing(includeDeletes); + includeQueries(); + run(); } - private void testConcurrentIndexing(int maxIndexingThreads, int maxDocsPerThread) throws Exception { - int numIndexingThreads = new Random().nextInt(maxIndexingThreads) + 1;; - testConcurrentIndexing(numIndexingThreads, maxDocsPerThread, null); + /** + * Tests that concurrent indexing with deletes, concurrent queries and explicit failovers succeed. + */ + // @Test + // TODO: This test flaps time to time. The symptom of the failure is missing docs i.e. indexing is declared successful + // but query could not reproduce all of the docs. I was able to repro this with NRT collection on vanilla 8.3 too. + // I have not root caused it yet. Keeping this test disabled, until the problem is root caused and fixed. + public void todoTestIndexingQueriesDeletesFailovers() throws Exception { + final boolean includeDeletes = true; + includeIndexing(includeDeletes); + includeQueries(); + includeFailovers(); + run(); } - private void testConcurrentIndexing(IndexingThreadInterleaver interleaver) throws Exception { - testConcurrentIndexing(2, 10, interleaver); + /** + * It starts all the threads that are included in the test (indexing, queries and failovers) in parallel. + * Then wait for them to finish (run length depends on {@link #INDEXING_ITERATIONS}). + * At the end it makes sures that no critical section was breached and no unexpected error occurred. + * Then verify the contents of shared store by querying for all docs on a new replica. + */ + private void run() throws Exception { + testState.startIncludedThreads(); + testState.waitForThreadsToStop(); + analyzeCoreConcurrencyStagesForBreaches(); + testState.checkErrors(); + Replica newReplica = addReplica(); + queryNewReplicaAndVerifyAllDocsFound(newReplica); } /** - * Start desired number of concurrent indexing threads with each indexing random number of docs between 1 and maxDocsPerThread. + * Adds a thread to test, that goes over {@link #INDEXING_ITERATIONS} or until it is interrupted. + * In each iteration it creates between 1 and {@link #MAX_NUM_OF_CONCURRENT_INDEXING_REQUESTS_PER_ITERATION} threads + * by calling {@link #createIndexingThreads(int, int, boolean)}, starts them concurrently and wait for them to finish + * before going to next iteration. Each indexing thread adds between 1 and {@link #MAX_NUM_OF_DOCS_PER_INDEXING_REQUEST} + * docs. * - * At the end it verifies everything got indexed successfully on leader. No critical section got breached. - * Also verify the integrity of shared store contents by pulling them on a follower replica. + * @param includeDeletes whether to randomly mark some docs for deletion and delete them in subsequent indexing requests + * or not */ - private void testConcurrentIndexing(int numIndexingThreads, int maxDocsPerThread, IndexingThreadInterleaver interleaver) throws Exception { - setupCluster(2); - CloudSolrClient cloudClient = cluster.getSolrClient(); - - // this map tracks the async pull queues per solr process - Map<String, Map<String, CountDownLatch>> solrProcessesTaskTracker = new HashMap<>(); - - JettySolrRunner solrProcess1 = cluster.getJettySolrRunner(0); - CoreStorageClient storageClient1 = setupLocalBlobStoreClient(sharedStoreRootPath, DEFAULT_BLOB_DIR_NAME); - setupTestSharedClientForNode(getBlobStorageProviderTestInstance(storageClient1), solrProcess1); - Map<String, CountDownLatch> asyncPullLatches1 = configureTestBlobProcessForNode(solrProcess1); - - JettySolrRunner solrProcess2 = cluster.getJettySolrRunner(1); - CoreStorageClient storageClient2 = setupLocalBlobStoreClient(sharedStoreRootPath, DEFAULT_BLOB_DIR_NAME); - setupTestSharedClientForNode(getBlobStorageProviderTestInstance(storageClient2), solrProcess2); - Map<String, CountDownLatch> asyncPullLatches2 = configureTestBlobProcessForNode(solrProcess2); - - solrProcessesTaskTracker.put(solrProcess1.getNodeName(), asyncPullLatches1); - solrProcessesTaskTracker.put(solrProcess2.getNodeName(), asyncPullLatches2); - - String collectionName = "sharedCollection"; - int maxShardsPerNode = 1; - int numReplicas = 2; - // specify a comma-delimited string of shard names for multiple shards when using - // an implicit router - String shardNames = "shard1"; - setupSharedCollectionWithShardNames(collectionName, maxShardsPerNode, numReplicas, shardNames); - - // get the leader replica and follower replicas - DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName); - Replica shardLeaderReplica = collection.getLeader("shard1"); - Replica followerReplica = null; - for (Replica repl : collection.getSlice("shard1").getReplicas()) { - if (repl.getName() != shardLeaderReplica.getName()) { - followerReplica = repl; - break; + private void includeIndexing(boolean includeDeletes) { + Thread t = new Thread(() -> { + try { + for (int i = 0; i < INDEXING_ITERATIONS && !testState.stopRunning.get(); i++) { + int numIndexingThreads = random().nextInt(MAX_NUM_OF_CONCURRENT_INDEXING_REQUESTS_PER_ITERATION) + 1; + int numDocsToAddPerThread = random().nextInt(MAX_NUM_OF_DOCS_PER_INDEXING_REQUEST) + 1; + Thread[] indexingThreads = createIndexingThreads(numIndexingThreads, numDocsToAddPerThread, includeDeletes); + for (int j = 0; j < numIndexingThreads; j++) { + indexingThreads[j].start(); + } + for (int j = 0; j < numIndexingThreads; j++) { + indexingThreads[j].join(); + } + if (Thread.interrupted()) { + // we have been interrupted so we will stop running + testState.stopRunning.set(true); + } + } + } catch (Exception ex) { + testState.indexingErrors.add(ex.getMessage()); } - } + // everything else stops running when indexing finishes + testState.stopRunning.set(true); + }); + testState.includeThread(t); + } - - JettySolrRunner shardLeaderSolrRunner = null; - for (JettySolrRunner solrRunner : cluster.getJettySolrRunners()) { - if(solrRunner.getNodeName().equals(shardLeaderReplica.getNodeName())){ - shardLeaderSolrRunner = solrRunner; - break; - } - } - List<String> progress = new ArrayList<>(); - - if(interleaver != null) { - configureTestSharedConcurrencyControllerForNode(shardLeaderSolrRunner, progress, interleaver); - } else { - configureTestSharedConcurrencyControllerForNode(shardLeaderSolrRunner, progress); - } - - AtomicInteger totalDocs= new AtomicInteger(0); + /** + * Creates {@code numIndexingThreads} threads with each adding {@code numDocsToAddPerThread}. + * + * @param includeDeletes whether to randomly mark some docs for deletion and delete them in subsequent indexing requests + * or not + */ + private Thread[] createIndexingThreads(int numIndexingThreads, int numDocsToAddPerThread, boolean includeDeletes) throws Exception { log.info("numIndexingThreads=" + numIndexingThreads); Thread[] indexingThreads = new Thread[numIndexingThreads]; - ConcurrentLinkedQueue<String> indexingErrors = new ConcurrentLinkedQueue<>(); - for (int i = 0; i < numIndexingThreads; i++) { + for (int i = 0; i < numIndexingThreads && !testState.stopRunning.get(); i++) { indexingThreads[i] = new Thread(() -> { - try { - // index between 1 to maxDocsPerThread docs - int numDocs = new Random().nextInt(maxDocsPerThread) + 1; - log.info("numDocs=" + numDocs); - UpdateRequest updateReq = new UpdateRequest(); - for (int j = 0; j < numDocs; j++) { - int docId = totalDocs.incrementAndGet(); - updateReq.add("id", Integer.toString(docId)); + List<String> idsToAdd = new ArrayList<>(); + // prepare the list of docs to add and delete outside the reattempt loop + for (int j = 0; j < numDocsToAddPerThread; j++) { + String docId = Integer.toString(testState.docIdGenerator.incrementAndGet()); + idsToAdd.add(docId); + } + List<String> idsToDelete = testState.idBatchesToDelete.poll(); + + // attempt until succeeded or max attempts + for (int j = 0; j < MAX_NUM_OF_ATTEMPTS_PER_INDEXING_REQUEST; j++) { + try { + String message = "attempt=" + (j + 1) + " numDocsToAdd=" + numDocsToAddPerThread + " docsToAdd=" + idsToAdd.toString(); + if (idsToDelete != null) { + message += " numDocsToDelete=" + idsToDelete.size() + " docsToDelete=" + idsToDelete.toString(); + } + log.info(message); + + UpdateRequest updateReq = new UpdateRequest(); + for (int k = 0; k < idsToAdd.size(); k++) { + updateReq.add("id", idsToAdd.get(k)); + } + if (includeDeletes && idsToDelete != null) { + updateReq.deleteById(idsToDelete); + } + processUpdateRequest(updateReq); + + testState.numDocsIndexed.addAndGet(numDocsToAddPerThread); + if (idsToDelete != null) { + testState.idsDeleted.addAll(idsToDelete); + } + + // randomly select some docs that can be deleted + if (includeDeletes) { + List<String> idsThatCanBeDeleted = new ArrayList<>(); + for (String indexedId : idsToAdd) { + if (random().nextBoolean()) { + idsThatCanBeDeleted.add(indexedId); + } + } + if (!idsThatCanBeDeleted.isEmpty()) { + testState.idBatchesToDelete.offer(idsThatCanBeDeleted); + } + } + // indexing was successful, stop attempting + break; + } catch (Exception ex) { + // last attempt also failed, record the error + if (j == MAX_NUM_OF_ATTEMPTS_PER_INDEXING_REQUEST - 1) { + testState.indexingErrors.add(Throwables.getStackTraceAsString(ex)); + } } - updateReq.commit(cloudClient, collectionName); - } catch (Exception ex) { - indexingErrors.add(ex.getMessage()); } }); } + return indexingThreads; + } - for (int i = 0; i < numIndexingThreads; i++) { - indexingThreads[i].start(); - } + /** + * Sends update request to the server, randomly choosing whether to send it with commit=true or not + */ + private void processUpdateRequest(UpdateRequest request) throws Exception { + UpdateResponse response = random().nextBoolean() + ? request.process(cluster.getSolrClient(), COLLECTION_NAME) + : request.commit(cluster.getSolrClient(), COLLECTION_NAME); - for (int i = 0; i < numIndexingThreads; i++) { - indexingThreads[i].join(); + if (response.getStatus() != 0) { + throw new RuntimeException("Update request failed with status=" + response.getStatus()); } + } - log.info("totalDocs=" + totalDocs.intValue()); - - assertTrue(indexingErrors.toString(), indexingErrors.isEmpty()); + /** + * Adds a thread to test, that goes over iterations until the test is stopped {@link TestState#stopRunning}. + * In each iteration it creates between 1 and {@link #MAX_NUM_OF_CONCURRENT_QUERY_REQUESTS_PER_ITERATION} threads + * by calling {@link #createQueryThreads(int)}, starts them concurrently and wait for them to finish + * before going to next iteration. To pace it better with indexing, {@link #DELAY_MS_BETWEEN_EACH_QUERY_ITERATION} + * delay is added between each query iteration. + */ + private void includeQueries() throws Exception { + Thread t = new Thread(() -> { + try { + while (!testState.stopRunning.get()) { + int numQueryThreads = random().nextInt(MAX_NUM_OF_CONCURRENT_QUERY_REQUESTS_PER_ITERATION) + 1; + Thread[] indexingThreads = createQueryThreads(numQueryThreads); + for (int j = 0; j < numQueryThreads; j++) { + indexingThreads[j].start(); + } + for (int j = 0; j < numQueryThreads; j++) { + indexingThreads[j].join(); + } + Thread.sleep(DELAY_MS_BETWEEN_EACH_QUERY_ITERATION); + } + } catch (Exception ex) { + testState.queryErrors.add(ex.getMessage()); + } + }); + testState.includeThread(t); + } - if(interleaver != null) { - assertNull(interleaver.error, interleaver.error); + /** + * Creates {@code numQueryThreads} threads with each querying all docs "*:*" + */ + private Thread[] createQueryThreads(int numQueryThreads) throws Exception { + log.info("numQueryThreads=" + numQueryThreads); + Thread[] queryThreads = new Thread[numQueryThreads]; + for (int i = 0; i < numQueryThreads && !testState.stopRunning.get(); i++) { + queryThreads[i] = new Thread(() -> { + try { + /** + * Don't have a way to ensure freshness of results yet. When we add something for query freshness later + * we may use that here. + * + * {@link SolrProcessTracker#corePullTracker} cannot help in concurrent query scenarios since there + * is no one-to-one guarantee between query and an async pull. + */ + cluster.getSolrClient().query(COLLECTION_NAME, new ModifiableSolrParams().set("q", "*:*")); + } catch (Exception ex) { + testState.queryErrors.add(Throwables.getStackTraceAsString(ex)); + } + }); } + return queryThreads; + } - assertFalse("no progress recorded", progress.isEmpty()); - - log.info(progress.toString()); - - assertCriticalSections(progress); + /** + * Adds a thread to test, that goes over iterations until the test is stopped {@link TestState#stopRunning}. + * In each iteration it failovers to new leader by calling {@link #failOver()}. It waits + * for {@link #DELAY_MS_BETWEEN_EACH_FAILOVER_ITERATION} between each iteration. + */ + private void includeFailovers() throws Exception { + Thread t = new Thread(() -> { + try { + while (!testState.stopRunning.get()) { + failOver(); + Thread.sleep(DELAY_MS_BETWEEN_EACH_FAILOVER_ITERATION); + } + } catch (Exception ex) { + testState.failoverError = Throwables.getStackTraceAsString(ex); + } + }); + testState.includeThread(t); + } - // verify the update wasn't forwarded to the follower and it didn't commit by checking the core - // this gives us confidence that the subsequent query we do triggers the pull - CoreContainer replicaCC = getCoreContainer(followerReplica.getNodeName()); - SolrCore core = null; - SolrClient followerDirectClient = null; - SolrClient leaderDirectClient = null; - try { - core = replicaCC.getCore(followerReplica.getCoreName()); - // the follower should only have the default segments file - assertEquals(1, core.getDeletionPolicy().getLatestCommit().getFileNames().size()); + /** + * Kills the current leader and waits for the new leader to be selected and then brings back up the killed leader + * as a follower replica. Before bringing back up the replica it randomly decides to delete its core directory. + */ + private void failOver() throws Exception { + DocCollection collection = getCollection(); + Replica leaderReplicaBeforeSwitch = collection.getLeader(SHARD_NAME); + final String leaderReplicaNameBeforeSwitch = leaderReplicaBeforeSwitch.getName(); + JettySolrRunner shardLeaderSolrRunnerBeforeSwitch = cluster.getReplicaJetty(leaderReplicaBeforeSwitch); + File leaderIndexDirBeforeSwitch = new File(shardLeaderSolrRunnerBeforeSwitch.getCoreContainer().getCoreRootDirectory() + + "/" + leaderReplicaBeforeSwitch.getCoreName()); + + shardLeaderSolrRunnerBeforeSwitch.stop(); + cluster.waitForJettyToStop(shardLeaderSolrRunnerBeforeSwitch); + waitForState("Timed out waiting for new replica to become leader", COLLECTION_NAME, (liveNodes, collectionState) -> { + Slice slice = collectionState.getSlice(SHARD_NAME); + if (slice.getLeader() == null) { + return false; + } + if (slice.getLeader().getName().equals(leaderReplicaNameBeforeSwitch)) { + return false; + } - // query the leader directly to verify it should have the document - leaderDirectClient = getHttpSolrClient(shardLeaderReplica.getBaseUrl() + "/" + shardLeaderReplica.getCoreName()); - ModifiableSolrParams params = new ModifiableSolrParams(); - params - .set("q", "*:*") - .set("distrib", "false"); - QueryResponse resp = leaderDirectClient.query(params); - assertEquals(totalDocs.intValue(), resp.getResults().getNumFound()); - - // we want to wait until the pull completes so set up a count down latch for the follower's - // core that we'll wait until pull finishes for - CountDownLatch latch = new CountDownLatch(1); - Map<String, CountDownLatch> asyncPullTasks = solrProcessesTaskTracker.get(followerReplica.getNodeName()); - asyncPullTasks.put(followerReplica.getCoreName(), latch); + return true; + }); - // query the follower directly to trigger the pull, this query should yield no results - // as it returns immediately - followerDirectClient = getHttpSolrClient(followerReplica.getBaseUrl() + "/" + followerReplica.getCoreName()); - resp = followerDirectClient.query(params); - assertEquals(0, resp.getResults().getNumFound()); + if (random().nextBoolean()) { + FileUtils.deleteDirectory(leaderIndexDirBeforeSwitch); + } - // wait until pull is finished - assertTrue(latch.await(120, TimeUnit.SECONDS)); + shardLeaderSolrRunnerBeforeSwitch.start(); + cluster.waitForNode(shardLeaderSolrRunnerBeforeSwitch, -1); - // do another query to verify we've pulled everything - resp = followerDirectClient.query(params); + waitForState("Timed out waiting for restarted replica to become active", COLLECTION_NAME, (liveNodes, collectionState) -> { + Slice slice = collectionState.getSlice(SHARD_NAME); + if (slice.getReplica(leaderReplicaNameBeforeSwitch).getState() != Replica.State.ACTIVE) { + return false; + } + return true; + }); - // verify we pulled - assertTrue(core.getDeletionPolicy().getLatestCommit().getFileNames().size() > 1); + setupSolrProcess(shardLeaderSolrRunnerBeforeSwitch); + } - // verify the document is present - assertEquals(totalDocs.intValue(), resp.getResults().getNumFound()); - } finally { - if (leaderDirectClient != null) { - leaderDirectClient.close(); - } - if (followerDirectClient != null) { - followerDirectClient.close(); - } - if (core != null) { - core.close(); - } - if (cluster != null) { - cluster.shutdown(); + /** + * Goes over all the lives of a node(node gets a new life on restart) and then goes over each core's concurrency stages + * in each life. Logs the concurrency stages in the order they occurred and then analyze those stages to make sure no + * critical section was breached. + */ + private void analyzeCoreConcurrencyStagesForBreaches() { + // Goes over each node + for (Map.Entry<String, List<SolrProcessTracker>> nodeTracker : + testState.solrNodesTracker.entrySet()) { + String nodeName = nodeTracker.getKey(); + int lifeCountForNode = nodeTracker.getValue().size(); + // Goes over each life of a node + for (int i = 0; i < lifeCountForNode; i++) { + ConcurrentHashMap<String, ConcurrentLinkedQueue<String>> coreConcurrencyStageTracker = nodeTracker.getValue().get(i).coreConcurrencyStageTracker; + if (coreConcurrencyStageTracker.isEmpty()) { + log.info("life " + (i + 1) + "/" + lifeCountForNode + " of node " + nodeName + " is empty"); + } else { + // Goes over each core + for (Map.Entry<String, ConcurrentLinkedQueue<String>> coreConcurrencyStagesEntry : coreConcurrencyStageTracker.entrySet()) { + String coreName = coreConcurrencyStagesEntry.getKey(); + List<String> coreConcurrencyStages = new ArrayList<>(coreConcurrencyStagesEntry.getValue()); + // Log line is truncated beyond certain length, therefore, printing them in the batches of 200 + List<List<String>> batches = Lists.partition(coreConcurrencyStages, 200); + if (batches.isEmpty()) { + batches = new ArrayList<>(1); + batches.add(new ArrayList<>(0)); + } + for (int j = 0; j < batches.size(); j++) { + log.info("batch " + (j + 1) + "/" + batches.size() + + " of core " + coreName + + " of life " + (i + 1) + "/" + lifeCountForNode + + " of node " + nodeName + + "\n" + batches.get(j).toString()); + } + analyzeCoreConcurrencyStagesForBreaches(coreName, coreConcurrencyStages); + } + } } - // clean up the shared store. The temp dir should clean up itself after the test class finishes - FileUtils.cleanDirectory(sharedStoreRootPath.toFile()); } } - private void assertCriticalSections(List<String> progress) { - String currentThreadId = null; + /** + * Analyze core's concurrency stages to make sure no critical section was breached. + */ + private void analyzeCoreConcurrencyStagesForBreaches(String coreName, List<String> coreConcurrencyStages) { Review comment: A note pointing to SharedCoreConcurrencyController here might help future devs understand the implied contracts we're testing here ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org