Created jia & AwaitsFix'ed the test ...

https://issues.apache.org/jira/browse/SOLR-16630



: Date: Fri, 20 Jan 2023 13:36:38 -0700 (MST)
: From: Chris Hostetter <[email protected]>
: To: [email protected]
: Cc: "[email protected]" <[email protected]>
: Subject: Re: [solr] branch branch_9x updated: test case added for coordinator
:     role
: 
: 
: Noble: TestCoordinatorRole.testNRTRestart is breaking on jenkins on 9x a 
: ridiculous number of times since you added it a week ago.
: 
: IIUC this test has *NEVER* passed on a jenkins 9x build (only on the main 
: builds)
: 
: -Hoss
: 
: 
: : Date: Thu, 12 Jan 2023 07:54:33 +0000
: : From: [email protected]
: : Reply-To: [email protected]
: : To: "[email protected]" <[email protected]>
: : Subject: [solr] branch branch_9x updated: test case added for coordinator 
role
: : 
: : This is an automated email from the ASF dual-hosted git repository.
: : 
: : noble pushed a commit to branch branch_9x
: : in repository https://gitbox.apache.org/repos/asf/solr.git
: : 
: : 
: : The following commit(s) were added to refs/heads/branch_9x by this push:
: :      new ec9b152c31f test case added for coordinator role
: : ec9b152c31f is described below
: : 
: : commit ec9b152c31fac99fe190ccc98e754c1200bd9fd2
: : Author: Noble Paul <[email protected]>
: : AuthorDate: Thu Jan 12 18:54:15 2023 +1100
: : 
: :     test case added for coordinator role
: : ---
: :  .../apache/solr/search/TestCoordinatorRole.java    | 412 
+++++++++++++++++++--
: :  1 file changed, 375 insertions(+), 37 deletions(-)
: : 
: : diff --git 
a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java 
b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
: : index 5e2dcfb70a8..6c4e845cf5a 100644
: : --- a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
: : +++ b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
: : @@ -17,69 +17,407 @@
: :  
: :  package org.apache.solr.search;
: :  
: : +import static org.apache.solr.common.params.CommonParams.OMIT_HEADER;
: : +import static org.apache.solr.common.params.CommonParams.TRUE;
: : +
: : +import java.lang.invoke.MethodHandles;
: : +import java.util.Date;
: : +import java.util.EnumSet;
: :  import java.util.HashSet;
: :  import java.util.List;
: : +import java.util.Random;
: :  import java.util.Set;
: : +import java.util.concurrent.ExecutorService;
: : +import java.util.concurrent.Future;
: : +import java.util.concurrent.TimeUnit;
: : +import java.util.concurrent.atomic.AtomicBoolean;
: : +import java.util.function.Consumer;
: :  import org.apache.solr.client.solrj.SolrQuery;
: :  import org.apache.solr.client.solrj.impl.CloudSolrClient;
: : +import org.apache.solr.client.solrj.impl.HttpSolrClient;
: :  import org.apache.solr.client.solrj.request.CollectionAdminRequest;
: :  import org.apache.solr.client.solrj.request.QueryRequest;
: :  import org.apache.solr.client.solrj.request.UpdateRequest;
: :  import org.apache.solr.client.solrj.response.QueryResponse;
: : +import org.apache.solr.cloud.MiniSolrCloudCluster;
: :  import org.apache.solr.cloud.SolrCloudTestCase;
: : +import org.apache.solr.common.SolrDocumentList;
: : +import org.apache.solr.common.SolrException;
: :  import org.apache.solr.common.SolrInputDocument;
: :  import org.apache.solr.common.cloud.DocCollection;
: : +import org.apache.solr.common.cloud.Replica;
: : +import org.apache.solr.common.params.CommonParams;
: : +import org.apache.solr.common.util.ExecutorUtil;
: : +import org.apache.solr.common.util.SimpleOrderedMap;
: : +import org.apache.solr.common.util.SolrNamedThreadFactory;
: : +import org.apache.solr.common.util.Utils;
: :  import org.apache.solr.core.NodeRoles;
: :  import org.apache.solr.embedded.JettySolrRunner;
: :  import org.apache.solr.servlet.CoordinatorHttpSolrCall;
: : -import org.junit.BeforeClass;
: : +import org.slf4j.Logger;
: : +import org.slf4j.LoggerFactory;
: :  
: :  public class TestCoordinatorRole extends SolrCloudTestCase {
: : -
: : -  @BeforeClass
: : -  public static void setupCluster() throws Exception {
: : -    configureCluster(4).addConfig("conf", 
configset("cloud-minimal")).configure();
: : -  }
: : +  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
: :  
: :    public void testSimple() throws Exception {
: : -    CloudSolrClient client = cluster.getSolrClient();
: : -    String COLLECTION_NAME = "test_coll";
: : -    String SYNTHETIC_COLLECTION = 
CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
: : -    CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2)
: : -        .process(cluster.getSolrClient());
: : -    cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
: : -    UpdateRequest ur = new UpdateRequest();
: : -    for (int i = 0; i < 10; i++) {
: : -      SolrInputDocument doc2 = new SolrInputDocument();
: : -      doc2.addField("id", "" + i);
: : -      ur.add(doc2);
: : -    }
: : +    MiniSolrCloudCluster cluster =
: : +        configureCluster(4).addConfig("conf", 
configset("cloud-minimal")).configure();
: : +    try {
: : +      CloudSolrClient client = cluster.getSolrClient();
: : +      String COLLECTION_NAME = "test_coll";
: : +      String SYNTHETIC_COLLECTION = 
CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
: : +      CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 
2)
: : +          .process(cluster.getSolrClient());
: : +      cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
: : +      UpdateRequest ur = new UpdateRequest();
: : +      for (int i = 0; i < 10; i++) {
: : +        SolrInputDocument doc2 = new SolrInputDocument();
: : +        doc2.addField("id", "" + i);
: : +        ur.add(doc2);
: : +      }
: :  
: : -    ur.commit(client, COLLECTION_NAME);
: : -    QueryResponse rsp = client.query(COLLECTION_NAME, new 
SolrQuery("*:*"));
: : -    assertEquals(10, rsp.getResults().getNumFound());
: : +      ur.commit(client, COLLECTION_NAME);
: : +      QueryResponse rsp = client.query(COLLECTION_NAME, new 
SolrQuery("*:*"));
: : +      assertEquals(10, rsp.getResults().getNumFound());
: :  
: : +      System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on");
: : +      JettySolrRunner coordinatorJetty = null;
: : +      try {
: : +        coordinatorJetty = cluster.startJettySolrRunner();
: : +      } finally {
: : +        System.clearProperty(NodeRoles.NODE_ROLES_PROP);
: : +      }
: : +      QueryResponse rslt =
: : +          new QueryRequest(new SolrQuery("*:*"))
: : +              .setPreferredNodes(List.of(coordinatorJetty.getNodeName()))
: : +              .process(client, COLLECTION_NAME);
: : +
: : +      assertEquals(10, rslt.getResults().size());
: : +
: : +      DocCollection collection =
: : +          
cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION);
: : +      assertNotNull(collection);
: : +
: : +      Set<String> expectedNodes = new HashSet<>();
: : +      expectedNodes.add(coordinatorJetty.getNodeName());
: : +      collection.forEachReplica((s, replica) -> 
expectedNodes.remove(replica.getNodeName()));
: : +      assertTrue(expectedNodes.isEmpty());
: : +    } finally {
: : +      cluster.shutdown();
: : +    }
: : +  }
: : +
: : +  public void testNRTRestart() throws Exception {
: : +    // we restart jetty and expect to find on disk data - need a local fs 
directory
: : +    useFactory(null);
: : +    String COLL = "coordinator_test_coll";
: : +    MiniSolrCloudCluster cluster =
: : +        configureCluster(3)
: : +            .withJettyConfig(jetty -> jetty.enableV2(true))
: : +            .addConfig("conf", configset("conf2"))
: : +            .configure();
: :      System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on");
: : -    JettySolrRunner coordinatorJetty = null;
: : +    JettySolrRunner qaJetty = cluster.startJettySolrRunner();
: : +    String qaJettyBase = qaJetty.getBaseUrl().toString();
: : +    System.clearProperty(NodeRoles.NODE_ROLES_PROP);
: : +    ExecutorService executor =
: : +        ExecutorUtil.newMDCAwareSingleThreadExecutor(new 
SolrNamedThreadFactory("manipulateJetty"));
: :      try {
: : -      coordinatorJetty = cluster.startJettySolrRunner();
: : +      CollectionAdminRequest.createCollection(COLL, "conf", 1, 1, 0, 1)
: : +          .process(cluster.getSolrClient());
: : +      cluster.waitForActiveCollection(COLL, 1, 2);
: : +      DocCollection docColl =
: : +          
cluster.getSolrClient().getClusterStateProvider().getClusterState().getCollection(COLL);
: : +      Replica nrtReplica = 
docColl.getReplicas(EnumSet.of(Replica.Type.NRT)).get(0);
: : +      assertNotNull(nrtReplica);
: : +      String nrtCore = nrtReplica.getCoreName();
: : +      Replica pullReplica = 
docColl.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0);
: : +      assertNotNull(pullReplica);
: : +      String pullCore = pullReplica.getCoreName();
: : +
: : +      SolrInputDocument sid = new SolrInputDocument();
: : +      sid.addField("id", "123");
: : +      sid.addField("desc_s", "A Document");
: : +      JettySolrRunner nrtJetty = null;
: : +      JettySolrRunner pullJetty = null;
: : +      for (JettySolrRunner j : cluster.getJettySolrRunners()) {
: : +        String nodeName = j.getNodeName();
: : +        if (nodeName.equals(nrtReplica.getNodeName())) {
: : +          nrtJetty = j;
: : +        } else if (nodeName.equals(pullReplica.getNodeName())) {
: : +          pullJetty = j;
: : +        }
: : +      }
: : +      assertNotNull(nrtJetty);
: : +      assertNotNull(pullJetty);
: : +      try (HttpSolrClient client = (HttpSolrClient) pullJetty.newClient()) 
{
: : +        client.add(COLL, sid);
: : +        client.commit(COLL);
: : +        assertEquals(
: : +            nrtCore,
: : +            getHostCoreName(
: : +                COLL, qaJettyBase, client, p -> p.add("shards.preference", 
"replica.type:NRT")));
: : +        assertEquals(
: : +            pullCore,
: : +            getHostCoreName(
: : +                COLL, qaJettyBase, client, p -> p.add("shards.preference", 
"replica.type:PULL")));
: : +        // Now , kill NRT jetty
: : +        JettySolrRunner nrtJettyF = nrtJetty;
: : +        JettySolrRunner pullJettyF = pullJetty;
: : +        Random r = random();
: : +        final long establishBaselineMs = r.nextInt(1000);
: : +        final long nrtDowntimeMs = r.nextInt(10000);
: : +        // NOTE: for `pullServiceTimeMs`, it can't be super-short. This is 
just to simplify our
: : +        // indexing code,
: : +        // based on the fact that our indexing is based on a PULL-node 
client.
: : +        final long pullServiceTimeMs = 1000 + (long) r.nextInt(9000);
: : +        Future<?> jettyManipulationFuture =
: : +            executor.submit(
: : +                () -> {
: : +                  // we manipulate the jetty instances in a separate 
thread to more closely mimic
: : +                  // the behavior we'd
: : +                  // see irl.
: : +                  try {
: : +                    Thread.sleep(establishBaselineMs);
: : +                    log.info("stopping NRT jetty ...");
: : +                    nrtJettyF.stop();
: : +                    log.info("NRT jetty stopped.");
: : +                    Thread.sleep(nrtDowntimeMs); // let NRT be down for a 
while
: : +                    log.info("restarting NRT jetty ...");
: : +                    nrtJettyF.start(true);
: : +                    log.info("NRT jetty restarted.");
: : +                    // once NRT is back up, we expect PULL to continue 
serving until the TTL on ZK
: : +                    // state
: : +                    // used for query request routing has expired (60s). 
But here we force a return
: : +                    // to NRT
: : +                    // by stopping the PULL replica after a brief delay ...
: : +                    Thread.sleep(pullServiceTimeMs);
: : +                    log.info("stopping PULL jetty ...");
: : +                    pullJettyF.stop();
: : +                    log.info("PULL jetty stopped.");
: : +                  } catch (Exception e) {
: : +                    throw new RuntimeException(e);
: : +                  }
: : +                });
: : +        String hostCore;
: : +        long start = new Date().getTime();
: : +        long individualRequestStart = start;
: : +        int count = 0;
: : +        while (nrtCore.equals(
: : +            hostCore =
: : +                getHostCoreName(
: : +                    COLL,
: : +                    qaJettyBase,
: : +                    client,
: : +                    p -> p.add("shards.preference", "replica.type:NRT")))) 
{
: : +          count++;
: : +          individualRequestStart = new Date().getTime();
: : +        }
: : +        long now = new Date().getTime();
: : +        log.info(
: : +            "phase1 NRT queries count={}, overall_duration={}, 
baseline_expected_overall_duration={}, switch-to-pull_duration={}",
: : +            count,
: : +            now - start,
: : +            establishBaselineMs,
: : +            now - individualRequestStart);
: : +        // default tolerance of 500ms below should suffice. Failover to 
PULL for this case should be
: : +        // very fast,
: : +        // because our QA-based client already knows both replicas are 
active, the index is stable,
: : +        // so the moment
: : +        // the client finds NRT is down it should be able to failover 
immediately and transparently
: : +        // to PULL.
: : +        assertEquals(
: : +            "when we break out of the NRT query loop, should be b/c routed 
to PULL",
: : +            pullCore,
: : +            hostCore);
: : +        SolrInputDocument d = new SolrInputDocument();
: : +        d.addField("id", "345");
: : +        d.addField("desc_s", "Another Document");
: : +        // attempts to add another doc while NRT is down should fail, then 
eventually succeed when
: : +        // NRT comes back up
: : +        count = 0;
: : +        start = new Date().getTime();
: : +        individualRequestStart = start;
: : +        for (; ; ) {
: : +          try {
: : +            client.add(COLL, d);
: : +            client.commit(COLL);
: : +            break;
: : +          } catch (SolrException ex) {
: : +            // we expect these until nrtJetty is back up.
: : +            count++;
: : +            Thread.sleep(100);
: : +          }
: : +          individualRequestStart = new Date().getTime();
: : +        }
: : +        now = new Date().getTime();
: : +        log.info(
: : +            "successfully added another doc; duration: {}, 
overall_duration={}, baseline_expected_overall_duration={}, exception_count={}",
: : +            now - individualRequestStart,
: : +            now - start,
: : +            nrtDowntimeMs,
: : +            count);
: : +        // NRT replica is back up, registered as available with Zk, and 
availability info has been
: : +        // pulled down by
: : +        // our PULL-replica-based `client`, forwarded indexing command to 
NRT, index/commit
: : +        // completed. All of this
: : +        // accounts for the 3000ms tolerance allowed for below. This is 
not a strict value, and if
: : +        // it causes failures
: : +        // regularly we should feel free to increase the tolerance; but 
it's meant to provide a
: : +        // stable baseline from
: : +        // which to detect regressions.
: : +        count = 0;
: : +        start = new Date().getTime();
: : +        individualRequestStart = start;
: : +        while (pullCore.equals(
: : +            hostCore =
: : +                getHostCoreName(
: : +                    COLL,
: : +                    qaJettyBase,
: : +                    client,
: : +                    p -> {
: : +                      p.set(CommonParams.Q, "id:345");
: : +                      p.add("shards.preference", "replica.type:NRT");
: : +                    }))) {
: : +          count++;
: : +          Thread.sleep(100);
: : +          individualRequestStart = new Date().getTime();
: : +        }
: : +        now = new Date().getTime();
: : +        log.info(
: : +            "query retries between NRT index-ready and query-ready: {}; 
overall_duration={}; baseline_expected_overall_duration={}; 
failover-request_duration={}",
: : +            count,
: : +            now - start,
: : +            pullServiceTimeMs,
: : +            now - individualRequestStart);
: : +        assertEquals(nrtCore, hostCore);
: : +        // allow any exceptions to propagate
: : +        jettyManipulationFuture.get();
: : +        if (true) return;
: : +
: : +        // next phase: just toggle a bunch
: : +        // TODO: could separate this out into a different test method, but 
this should suffice for
: : +        // now
: : +        pullJetty.start(true);
: : +        AtomicBoolean done = new AtomicBoolean();
: : +        long runMinutes = 1;
: : +        long finishTimeMs =
: : +            new Date().getTime() + 
TimeUnit.MILLISECONDS.convert(runMinutes, TimeUnit.MINUTES);
: : +        JettySolrRunner[] jettys = new JettySolrRunner[] {nrtJettyF, 
pullJettyF};
: : +        Random threadRandom = new Random(r.nextInt());
: : +        Future<Integer> f =
: : +            executor.submit(
: : +                () -> {
: : +                  int iteration = 0;
: : +                  while (new Date().getTime() < finishTimeMs && 
!done.get()) {
: : +                    int idx = iteration++ % jettys.length;
: : +                    JettySolrRunner toManipulate = jettys[idx];
: : +                    try {
: : +                      int serveTogetherTime = threadRandom.nextInt(7000);
: : +                      int downTime = threadRandom.nextInt(7000);
: : +                      log.info("serving together for {}ms", 
serveTogetherTime);
: : +                      Thread.sleep(serveTogetherTime);
: : +                      log.info("stopping {} ...", idx);
: : +                      toManipulate.stop();
: : +                      log.info("stopped {}.", idx);
: : +                      Thread.sleep(downTime);
: : +                      log.info("restarting {} ...", idx);
: : +                      toManipulate.start(true);
: : +                      log.info("restarted {}.", idx);
: : +                    } catch (Exception e) {
: : +                      throw new RuntimeException(e);
: : +                    }
: : +                  }
: : +                  done.set(true);
: : +                  return iteration;
: : +                });
: : +        count = 0;
: : +        start = new Date().getTime();
: : +        try {
: : +          do {
: : +            pullCore.equals(
: : +                hostCore =
: : +                    getHostCoreName(
: : +                        COLL,
: : +                        qaJettyBase,
: : +                        client,
: : +                        p -> {
: : +                          p.set(CommonParams.Q, "id:345");
: : +                          p.add("shards.preference", "replica.type:NRT");
: : +                        }));
: : +            count++;
: : +            Thread.sleep(100);
: : +          } while (!done.get());
: : +        } finally {
: : +          final String result;
: : +          if (done.getAndSet(true)) {
: : +            result = "Success";
: : +          } else {
: : +            // not yet set to done, completed abnormally (exception will 
be thrown beyond `finally`
: : +            // block)
: : +            result = "Failure";
: : +          }
: : +          Integer toggleCount = f.get();
: : +          long secondsDuration =
: : +              TimeUnit.SECONDS.convert(new Date().getTime() - start, 
TimeUnit.MILLISECONDS);
: : +          log.info(
: : +              "{}! {} seconds, {} toggles, {} requests served",
: : +              result,
: : +              secondsDuration,
: : +              toggleCount,
: : +              count);
: : +        }
: : +      }
: :      } finally {
: : -      System.clearProperty(NodeRoles.NODE_ROLES_PROP);
: : +      try {
: : +        ExecutorUtil.shutdownAndAwaitTermination(executor);
: : +      } finally {
: : +        cluster.shutdown();
: : +      }
: :      }
: : -    QueryResponse rslt =
: : -        new QueryRequest(new SolrQuery("*:*"))
: : -            .setPreferredNodes(List.of(coordinatorJetty.getNodeName()))
: : -            .process(client, COLLECTION_NAME);
: : -
: : -    assertEquals(10, rslt.getResults().size());
: : +  }
: :  
: : -    DocCollection collection =
: : -        
cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION);
: : -    assertNotNull(collection);
: : +  @SuppressWarnings("rawtypes")
: : +  private String getHostCoreName(
: : +      String COLL, String qaNode, HttpSolrClient solrClient, 
Consumer<SolrQuery> p)
: : +      throws Exception {
: :  
: : -    Set<String> expectedNodes = new HashSet<>();
: : -    expectedNodes.add(coordinatorJetty.getNodeName());
: : -    collection.forEachReplica((s, replica) -> 
expectedNodes.remove(replica.getNodeName()));
: : -    assertTrue(expectedNodes.isEmpty());
: : +    boolean found = false;
: : +    SolrQuery q = new SolrQuery("*:*");
: : +    q.add("fl", "id,desc_s,_core_:[core]").add(OMIT_HEADER, TRUE);
: : +    p.accept(q);
: : +    StringBuilder sb =
: : +        new 
StringBuilder(qaNode).append("/").append(COLL).append("/select?wt=javabin");
: : +    q.forEach(e -> 
sb.append("&").append(e.getKey()).append("=").append(e.getValue()[0]));
: : +    SolrDocumentList docs = null;
: : +    for (int i = 0; i < 100; i++) {
: : +      try {
: : +        SimpleOrderedMap rsp =
: : +            (SimpleOrderedMap)
: : +                Utils.executeGET(solrClient.getHttpClient(), 
sb.toString(), Utils.JAVABINCONSUMER);
: : +        docs = (SolrDocumentList) rsp.get("response");
: : +        if (docs.size() > 0) {
: : +          found = true;
: : +          break;
: : +        }
: : +      } catch (SolrException ex) {
: : +        // we know we're doing tricky things that might cause transient 
errors
: : +        // TODO: all these query requests go to the QA node -- should QA 
propagate internal request
: : +        // errors
: : +        //  to the external client (and the external client retry?) or 
should QA attempt to failover
: : +        // transparently
: : +        //  in the event of an error?
: : +        if (i < 5) {
: : +          log.info("swallowing transient error", ex);
: : +        } else {
: : +          log.error("only expect actual _errors_ within a small window 
(e.g. 500ms)", ex);
: : +          fail("initial error time threshold exceeded");
: : +        }
: : +      }
: : +      Thread.sleep(100);
: : +    }
: : +    assertTrue(found);
: : +    return (String) docs.get(0).getFieldValue("_core_");
: :    }
: :  }
: : 
: : 
: 
: -Hoss
: http://www.lucidworks.com/
: 

-Hoss
http://www.lucidworks.com/

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to