anshumg commented on a change in pull request #2403:
URL: https://github.com/apache/lucene-solr/pull/2403#discussion_r589872439



##########
File path: 
lucene/core/src/java/org/apache/lucene/search/CancellableCollector.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.search;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.lucene.index.LeafReaderContext;
+
+/** Allows a query to be cancelled */
+public class CancellableCollector implements Collector, CancellableTask {
+
+  /** Thrown when a query gets cancelled */
+  public static class QueryCancelledException extends RuntimeException {}
+
+  private Collector collector;
+  private AtomicBoolean isQueryCancelled;

Review comment:
       This can also be declared final

##########
File path: solr/core/src/test/org/apache/solr/search/TestTaskManagement.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.search;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.NamedList;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+public class TestTaskManagement extends SolrCloudTestCase {
+    private static final String COLLECTION_NAME = "collection1";
+
+    private ExecutorService executorService;
+
+    @BeforeClass
+    public static void setupCluster() throws Exception {
+        initCore("solrconfig.xml", "schema11.xml");
+
+        configureCluster(4)
+                .addConfig("conf", configset("sql"))
+                .configure();
+    }
+
+    @AfterClass
+    public static void tearDownCluster() throws Exception {
+        shutdownCluster();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setUp();
+
+        CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 1)
+                .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+                .process(cluster.getSolrClient());
+        cluster.waitForActiveCollection(COLLECTION_NAME, 2, 2);
+        cluster.getSolrClient().setDefaultCollection(COLLECTION_NAME);
+
+        cluster.getSolrClient().setDefaultCollection("collection1");
+
+        executorService = 
ExecutorUtil.newMDCAwareCachedThreadPool("TestTaskManagement");
+
+        List<SolrInputDocument> docs = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            SolrInputDocument doc = new SolrInputDocument();
+            doc.addField("id", i);
+            doc.addField("foo1_s", Integer.toString(i));
+            doc.addField("foo2_s", Boolean.toString(i % 2 == 0));
+            doc.addField("foo4_s", new BytesRef(Boolean.toString(i % 2 == 0)));
+
+            docs.add(doc);
+        }
+
+        cluster.getSolrClient().add(docs);
+        cluster.getSolrClient().commit();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        
CollectionAdminRequest.deleteCollection(COLLECTION_NAME).process(cluster.getSolrClient());
+        executorService.shutdown();
+        super.tearDown();
+    }
+
+    @Test
+    public void testNonExistentQuery() throws Exception {
+        ModifiableSolrParams params = new ModifiableSolrParams();
+
+        params.set("cancelUUID", "foobar");
+        @SuppressWarnings({"rawtypes"})
+        SolrRequest request = new QueryRequest(params);
+        request.setPath("/tasks/cancel");
+
+        NamedList<Object> queryResponse = null;
+
+        queryResponse = cluster.getSolrClient().request(request);
+
+        assertEquals("Query with queryID foobar not found", 
queryResponse.get("status"));
+    }
+
+    @Test
+    public void testCancellationQuery() throws Exception {

Review comment:
       Seems like nothing in here throws an Exception.

##########
File path: 
solr/core/src/java/org/apache/solr/handler/component/ActiveTasksListComponent.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.util.NamedList;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import static java.util.Arrays.asList;
+import static org.apache.solr.common.util.Utils.fromJSONString;

Review comment:
       This is also unused

##########
File path: 
lucene/core/src/test/org/apache/lucene/search/TestCancellableCollector.java
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.search;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.SortedDocValuesField;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.RandomIndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.NamedThreadFactory;
+
+public class TestCancellableCollector extends LuceneTestCase {
+  Directory dir;
+  IndexReader reader;
+  IndexSearcher searcher;
+  ExecutorService executor = null;
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    dir = newDirectory();
+
+    RandomIndexWriter iw = new RandomIndexWriter(random(), dir);
+    Random random = random();
+    for (int i = 0; i < 100; i++) {
+      Document doc = new Document();
+      doc.add(newStringField("field", Integer.toString(i), Field.Store.NO));
+      doc.add(newStringField("field2", Boolean.toString(i % 2 == 0), 
Field.Store.NO));
+      doc.add(new SortedDocValuesField("field2", new 
BytesRef(Boolean.toString(i % 2 == 0))));
+      iw.addDocument(doc);
+
+      if (random.nextBoolean()) {
+        iw.commit();
+      }
+    }
+    reader = iw.getReader();
+    iw.close();
+
+    searcher = new IndexSearcher(reader);
+
+    executor =
+        new ThreadPoolExecutor(
+            4,
+            4,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<Runnable>(),
+            new NamedThreadFactory("TestIndexSearcher"));
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+    reader.close();
+    dir.close();
+
+    if (executor != null) {
+      executor.shutdown();
+    }
+
+    executor = null;
+  }
+
+  private CancellableCollector buildCancellableCollector(
+      final int numHits, boolean delayStart, boolean delayCollection) {
+    TopScoreDocCollector topScoreDocCollector = 
TopScoreDocCollector.create(numHits, null, 1);
+    CancellableCollector collector = new 
CancellableCollector(topScoreDocCollector);
+
+    DummyCancellableCollector dummyCancellableCollector =
+        new DummyCancellableCollector(collector, delayStart, delayCollection);
+
+    return dummyCancellableCollector;
+  }
+
+  private void executeSearchTest(
+      IndexSearcher searcher, Query query, CancellableCollector 
cancellableCollector, int numHits)
+      throws Exception {
+    TopDocs topDocs = searcher.search(query, numHits);
+
+    searcher.search(query, cancellableCollector);
+
+    CancellableCollector internalCancellableCollector =
+        (CancellableCollector) cancellableCollector.getInternalCollector();
+    TopScoreDocCollector topScoreDocCollector =
+        (TopScoreDocCollector) 
internalCancellableCollector.getInternalCollector();
+
+    assertEquals(topDocs.totalHits.value, topScoreDocCollector.totalHits);
+  }
+
+  private void cancelQuery(CancellableCollector cancellableCollector, final 
int sleepTime) {
+    executor.submit(
+        () -> {
+
+          // Wait for some time to let the query start
+          try {
+            if (sleepTime > 0) {
+              Thread.sleep(sleepTime);
+            }
+
+            cancellableCollector.cancelTask();
+          } catch (InterruptedException e) {
+            throw new RuntimeException(e.getMessage());
+          }
+        });
+  }
+
+  public void testSearchWithoutCancellation() throws Exception {
+    CancellableCollector cancellableCollector = buildCancellableCollector(50, 
false, false);
+
+    Query query = new TermQuery(new Term("field", "1"));
+
+    executeSearchTest(searcher, query, cancellableCollector, 50);
+
+    query = new MatchAllDocsQuery();
+
+    cancellableCollector = buildCancellableCollector(100, false, false);
+
+    executeSearchTest(searcher, query, cancellableCollector, 50);
+  }
+
+  public void testSearchWithCancellationBeforeActualDocumentCollection() {
+    Query query = new MatchAllDocsQuery();
+
+    CancellableCollector cancellableCollector = 
buildCancellableCollector(5000, true, false);
+
+    expectThrows(
+        CancellableCollector.QueryCancelledException.class,
+        () -> {
+          // Cancel the query before the document collection starts
+          cancelQuery(cancellableCollector, 0);
+
+          executeSearchTest(searcher, query, cancellableCollector, 5000);
+        });
+  }
+
+  public void testSearchWithCancellationBetweenActualDocumentCollection() {
+    Query query = new MatchAllDocsQuery();
+
+    CancellableCollector cancellableCollector = 
buildCancellableCollector(5000, false, true);
+
+    expectThrows(
+        CancellableCollector.QueryCancelledException.class,
+        () -> {
+          // Cancel the query before the document collection starts
+          cancelQuery(cancellableCollector, 0);
+
+          executeSearchTest(searcher, query, cancellableCollector, 5000);
+        });
+  }
+
+  public class DummyCancellableCollector extends CancellableCollector {
+    private CancellableCollector collector;

Review comment:
       All of these could be final

##########
File path: 
solr/core/src/java/org/apache/solr/handler/component/TaskManagementHandler.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.RequestHandlerBase;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.security.PermissionNameProvider;
+import org.apache.solr.util.plugin.SolrCoreAware;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.solr.common.params.CommonParams.DISTRIB;
+import static org.apache.solr.common.params.CommonParams.PATH;
+
+/**
+ * Abstract class which serves as the root of all task managing handlers
+ */
+public abstract class TaskManagementHandler extends RequestHandlerBase 
implements SolrCoreAware, PermissionNameProvider {
+    private ShardHandlerFactory shardHandlerFactory;
+
+    @Override
+    public void inform(SolrCore core) {
+        this.shardHandlerFactory = 
core.getCoreContainer().getShardHandlerFactory();
+    }
+
+    /**
+     * Process the actual request.
+     * extraParams is required for allowing sub handlers to pass in custom 
parameters to be put in the
+     * outgoing shard request
+     */
+    protected void processRequest(SolrQueryRequest req, ResponseBuilder rb, 
Map<String, String> extraParams) throws IOException {
+        ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+        List<SearchComponent> components = rb.components;
+
+        shardHandler.prepDistributed(rb);
+
+        for(SearchComponent c : components) {
+            c.prepare(rb);
+        }
+
+        if (!rb.isDistrib) {
+            for (SearchComponent component : components) {
+                component.process(rb);
+            }
+        } else {
+            ShardRequest sreq = new ShardRequest();
+
+            // Distribute to all shards
+            sreq.shards = rb.shards;
+            sreq.actualShards = sreq.shards;
+
+            sreq.responses = new ArrayList<>(sreq.actualShards.length);
+            rb.finished = new ArrayList<>();
+
+            for (String shard : sreq.actualShards) {
+                ModifiableSolrParams params = new 
ModifiableSolrParams(sreq.params);
+                String reqPath = (String) req.getContext().get(PATH);
+
+                params.set(CommonParams.QT, reqPath);
+                params.remove(ShardParams.SHARDS);      // not a top-level 
request
+                params.set(DISTRIB, "false");               // not a top-level 
request
+                params.remove("indent");
+                params.remove(CommonParams.HEADER_ECHO_PARAMS);
+                params.set(ShardParams.IS_SHARD, true);  // a sub (shard) 
request
+                params.set(ShardParams.SHARDS_PURPOSE, sreq.purpose);
+                params.set(ShardParams.SHARD_URL, shard); // so the shard 
knows what was asked
+                params.set(CommonParams.OMIT_HEADER, false);
+
+                if (extraParams != null) {
+                    Iterator<Map.Entry<String, String>> iterator = 
extraParams.entrySet().iterator();
+
+                    while (iterator.hasNext()) {
+                        Map.Entry<String, String> entry = iterator.next();
+
+                        params.set(entry.getKey(), entry.getValue());

Review comment:
       Perhaps just use forEach?

##########
File path: 
lucene/core/src/test/org/apache/lucene/search/TestCancellableCollector.java
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.search;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.SortedDocValuesField;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.RandomIndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.NamedThreadFactory;
+
+public class TestCancellableCollector extends LuceneTestCase {
+  Directory dir;
+  IndexReader reader;
+  IndexSearcher searcher;
+  ExecutorService executor = null;
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    dir = newDirectory();
+
+    RandomIndexWriter iw = new RandomIndexWriter(random(), dir);
+    Random random = random();
+    for (int i = 0; i < 100; i++) {
+      Document doc = new Document();
+      doc.add(newStringField("field", Integer.toString(i), Field.Store.NO));
+      doc.add(newStringField("field2", Boolean.toString(i % 2 == 0), 
Field.Store.NO));
+      doc.add(new SortedDocValuesField("field2", new 
BytesRef(Boolean.toString(i % 2 == 0))));
+      iw.addDocument(doc);
+
+      if (random.nextBoolean()) {
+        iw.commit();
+      }
+    }
+    reader = iw.getReader();
+    iw.close();
+
+    searcher = new IndexSearcher(reader);
+
+    executor =
+        new ThreadPoolExecutor(
+            4,
+            4,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<Runnable>(),
+            new NamedThreadFactory("TestIndexSearcher"));
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+    reader.close();
+    dir.close();
+
+    if (executor != null) {
+      executor.shutdown();
+    }
+
+    executor = null;
+  }
+
+  private CancellableCollector buildCancellableCollector(
+      final int numHits, boolean delayStart, boolean delayCollection) {
+    TopScoreDocCollector topScoreDocCollector = 
TopScoreDocCollector.create(numHits, null, 1);
+    CancellableCollector collector = new 
CancellableCollector(topScoreDocCollector);
+
+    DummyCancellableCollector dummyCancellableCollector =

Review comment:
       Just return here instead of the assignment? 

##########
File path: solr/core/src/test/org/apache/solr/search/TestTaskManagement.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.search;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.NamedList;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+public class TestTaskManagement extends SolrCloudTestCase {
+    private static final String COLLECTION_NAME = "collection1";
+
+    private ExecutorService executorService;
+
+    @BeforeClass
+    public static void setupCluster() throws Exception {
+        initCore("solrconfig.xml", "schema11.xml");
+
+        configureCluster(4)
+                .addConfig("conf", configset("sql"))
+                .configure();
+    }
+
+    @AfterClass
+    public static void tearDownCluster() throws Exception {
+        shutdownCluster();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setUp();
+
+        CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 1)
+                .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+                .process(cluster.getSolrClient());
+        cluster.waitForActiveCollection(COLLECTION_NAME, 2, 2);
+        cluster.getSolrClient().setDefaultCollection(COLLECTION_NAME);
+
+        cluster.getSolrClient().setDefaultCollection("collection1");
+
+        executorService = 
ExecutorUtil.newMDCAwareCachedThreadPool("TestTaskManagement");
+
+        List<SolrInputDocument> docs = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            SolrInputDocument doc = new SolrInputDocument();
+            doc.addField("id", i);
+            doc.addField("foo1_s", Integer.toString(i));
+            doc.addField("foo2_s", Boolean.toString(i % 2 == 0));
+            doc.addField("foo4_s", new BytesRef(Boolean.toString(i % 2 == 0)));
+
+            docs.add(doc);
+        }
+
+        cluster.getSolrClient().add(docs);
+        cluster.getSolrClient().commit();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        
CollectionAdminRequest.deleteCollection(COLLECTION_NAME).process(cluster.getSolrClient());
+        executorService.shutdown();
+        super.tearDown();
+    }
+
+    @Test
+    public void testNonExistentQuery() throws Exception {
+        ModifiableSolrParams params = new ModifiableSolrParams();
+
+        params.set("cancelUUID", "foobar");
+        @SuppressWarnings({"rawtypes"})
+        SolrRequest request = new QueryRequest(params);
+        request.setPath("/tasks/cancel");
+
+        NamedList<Object> queryResponse = null;
+
+        queryResponse = cluster.getSolrClient().request(request);
+
+        assertEquals("Query with queryID foobar not found", 
queryResponse.get("status"));
+    }
+
+    @Test
+    public void testCancellationQuery() throws Exception {
+        ConcurrentHashSet<Integer> queryIdsSet = new ConcurrentHashSet<>();
+        ConcurrentHashSet<Integer> notFoundIdsSet = new ConcurrentHashSet<>();
+
+        List<CompletableFuture<Void>> queryFutures = new ArrayList<>();
+
+        for (int i = 0; i < 100; i++) {
+            CompletableFuture<Void> future = 
executeQueryAsync(Integer.toString(i));
+
+            queryFutures.add(future);
+        }
+
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        for (int i = 0; i < 90; i++) {
+            CompletableFuture<Void> future = cancelQuery(Integer.toString(i), 
4000, queryIdsSet, notFoundIdsSet);
+
+            futures.add(future);
+        }
+
+        futures.forEach(CompletableFuture::join);
+
+        queryFutures.forEach(CompletableFuture::join);
+
+        assertTrue(queryIdsSet.size() + notFoundIdsSet.size() == 90);
+    }
+
+    @Test
+    public void testListCancellableQueries() throws Exception {
+        ModifiableSolrParams params = new ModifiableSolrParams();
+
+        @SuppressWarnings({"rawtypes"})
+        SolrRequest request = new QueryRequest(params);
+        request.setPath("/tasks/list");
+
+        for (int i = 0; i < 50; i++) {
+            executeQueryAsync(Integer.toString(i));
+        }
+
+        NamedList<Object> queryResponse = null;
+
+        queryResponse = cluster.getSolrClient().request(request);
+
+        @SuppressWarnings({"unchecked"})
+        NamedList<String> result = (NamedList<String>) 
queryResponse.get("taskList");
+
+        Iterator<Map.Entry<String, String>> iterator = result.iterator();
+
+        Set<Integer> presentQueryIDs = new HashSet<>();
+
+        while (iterator.hasNext()) {
+            Map.Entry<String, String> entry = iterator.next();
+
+            presentQueryIDs.add(Integer.parseInt(entry.getKey()));
+        }
+
+        assertTrue(presentQueryIDs.size() > 0 && presentQueryIDs.size() <= 50);
+
+        Iterator<Integer> integerIterator = presentQueryIDs.iterator();
+
+        while (integerIterator.hasNext()) {
+            int value = integerIterator.next();
+
+            assertTrue (value >= 0 && value < 50);
+        }
+    }
+
+    @Test
+    public void testCheckSpecificQueryStatus() throws Exception {
+        ModifiableSolrParams params = new ModifiableSolrParams();
+
+        params.set("taskUUID", "25");
+
+        @SuppressWarnings({"rawtypes"})
+        SolrRequest request = new QueryRequest(params);
+
+        request.setPath("/tasks/list");
+
+        NamedList<Object> queryResponse = 
cluster.getSolrClient().request(request);
+
+        @SuppressWarnings({"unchecked"})
+        String result = (String) queryResponse.get("taskStatus");
+
+        assertFalse(result.contains("true"));
+    }
+
+    private CompletableFuture<Void> cancelQuery(final String queryID, final 
int sleepTime, Set<Integer> cancelledQueryIdsSet,

Review comment:
       `sleepTime` is always 4000. Do you really want this parameterized here?

##########
File path: solr/core/src/test/org/apache/solr/search/TestTaskManagement.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.search;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.NamedList;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+public class TestTaskManagement extends SolrCloudTestCase {
+    private static final String COLLECTION_NAME = "collection1";
+
+    private ExecutorService executorService;
+
+    @BeforeClass
+    public static void setupCluster() throws Exception {
+        initCore("solrconfig.xml", "schema11.xml");
+
+        configureCluster(4)
+                .addConfig("conf", configset("sql"))
+                .configure();
+    }
+
+    @AfterClass
+    public static void tearDownCluster() throws Exception {
+        shutdownCluster();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setUp();
+
+        CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 1)
+                .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+                .process(cluster.getSolrClient());
+        cluster.waitForActiveCollection(COLLECTION_NAME, 2, 2);
+        cluster.getSolrClient().setDefaultCollection(COLLECTION_NAME);
+
+        cluster.getSolrClient().setDefaultCollection("collection1");
+
+        executorService = 
ExecutorUtil.newMDCAwareCachedThreadPool("TestTaskManagement");
+
+        List<SolrInputDocument> docs = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            SolrInputDocument doc = new SolrInputDocument();
+            doc.addField("id", i);
+            doc.addField("foo1_s", Integer.toString(i));
+            doc.addField("foo2_s", Boolean.toString(i % 2 == 0));
+            doc.addField("foo4_s", new BytesRef(Boolean.toString(i % 2 == 0)));
+
+            docs.add(doc);
+        }
+
+        cluster.getSolrClient().add(docs);
+        cluster.getSolrClient().commit();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        
CollectionAdminRequest.deleteCollection(COLLECTION_NAME).process(cluster.getSolrClient());
+        executorService.shutdown();
+        super.tearDown();
+    }
+
+    @Test
+    public void testNonExistentQuery() throws Exception {
+        ModifiableSolrParams params = new ModifiableSolrParams();
+
+        params.set("cancelUUID", "foobar");
+        @SuppressWarnings({"rawtypes"})
+        SolrRequest request = new QueryRequest(params);
+        request.setPath("/tasks/cancel");
+
+        NamedList<Object> queryResponse = null;
+
+        queryResponse = cluster.getSolrClient().request(request);
+
+        assertEquals("Query with queryID foobar not found", 
queryResponse.get("status"));
+    }
+
+    @Test
+    public void testCancellationQuery() throws Exception {
+        ConcurrentHashSet<Integer> queryIdsSet = new ConcurrentHashSet<>();
+        ConcurrentHashSet<Integer> notFoundIdsSet = new ConcurrentHashSet<>();
+
+        List<CompletableFuture<Void>> queryFutures = new ArrayList<>();
+
+        for (int i = 0; i < 100; i++) {
+            CompletableFuture<Void> future = 
executeQueryAsync(Integer.toString(i));
+
+            queryFutures.add(future);
+        }
+
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        for (int i = 0; i < 90; i++) {
+            CompletableFuture<Void> future = cancelQuery(Integer.toString(i), 
4000, queryIdsSet, notFoundIdsSet);
+
+            futures.add(future);
+        }
+
+        futures.forEach(CompletableFuture::join);
+
+        queryFutures.forEach(CompletableFuture::join);
+
+        assertTrue(queryIdsSet.size() + notFoundIdsSet.size() == 90);
+    }
+
+    @Test
+    public void testListCancellableQueries() throws Exception {
+        ModifiableSolrParams params = new ModifiableSolrParams();
+
+        @SuppressWarnings({"rawtypes"})
+        SolrRequest request = new QueryRequest(params);
+        request.setPath("/tasks/list");
+
+        for (int i = 0; i < 50; i++) {
+            executeQueryAsync(Integer.toString(i));
+        }
+
+        NamedList<Object> queryResponse = null;
+
+        queryResponse = cluster.getSolrClient().request(request);
+
+        @SuppressWarnings({"unchecked"})
+        NamedList<String> result = (NamedList<String>) 
queryResponse.get("taskList");
+
+        Iterator<Map.Entry<String, String>> iterator = result.iterator();
+
+        Set<Integer> presentQueryIDs = new HashSet<>();
+
+        while (iterator.hasNext()) {
+            Map.Entry<String, String> entry = iterator.next();
+
+            presentQueryIDs.add(Integer.parseInt(entry.getKey()));
+        }
+
+        assertTrue(presentQueryIDs.size() > 0 && presentQueryIDs.size() <= 50);
+
+        Iterator<Integer> integerIterator = presentQueryIDs.iterator();

Review comment:
       Simplify with
   
   ```
   for (int value : presentQueryIDs) {
     assertTrue(value >= 0 && value < 50);
   }
   ```

##########
File path: 
lucene/core/src/java/org/apache/lucene/search/CancellableCollector.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.search;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.lucene.index.LeafReaderContext;
+
+/** Allows a query to be cancelled */
+public class CancellableCollector implements Collector, CancellableTask {
+
+  /** Thrown when a query gets cancelled */
+  public static class QueryCancelledException extends RuntimeException {}
+
+  private Collector collector;

Review comment:
       Can be final

##########
File path: solr/core/src/test/org/apache/solr/search/TestTaskManagement.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.search;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.NamedList;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+public class TestTaskManagement extends SolrCloudTestCase {
+    private static final String COLLECTION_NAME = "collection1";
+
+    private ExecutorService executorService;
+
+    @BeforeClass
+    public static void setupCluster() throws Exception {
+        initCore("solrconfig.xml", "schema11.xml");
+
+        configureCluster(4)
+                .addConfig("conf", configset("sql"))
+                .configure();
+    }
+
+    @AfterClass
+    public static void tearDownCluster() throws Exception {
+        shutdownCluster();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setUp();
+
+        CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 1)
+                .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+                .process(cluster.getSolrClient());
+        cluster.waitForActiveCollection(COLLECTION_NAME, 2, 2);
+        cluster.getSolrClient().setDefaultCollection(COLLECTION_NAME);
+
+        cluster.getSolrClient().setDefaultCollection("collection1");
+
+        executorService = 
ExecutorUtil.newMDCAwareCachedThreadPool("TestTaskManagement");
+
+        List<SolrInputDocument> docs = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            SolrInputDocument doc = new SolrInputDocument();
+            doc.addField("id", i);
+            doc.addField("foo1_s", Integer.toString(i));
+            doc.addField("foo2_s", Boolean.toString(i % 2 == 0));
+            doc.addField("foo4_s", new BytesRef(Boolean.toString(i % 2 == 0)));
+
+            docs.add(doc);
+        }
+
+        cluster.getSolrClient().add(docs);
+        cluster.getSolrClient().commit();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        
CollectionAdminRequest.deleteCollection(COLLECTION_NAME).process(cluster.getSolrClient());
+        executorService.shutdown();
+        super.tearDown();
+    }
+
+    @Test
+    public void testNonExistentQuery() throws Exception {
+        ModifiableSolrParams params = new ModifiableSolrParams();
+
+        params.set("cancelUUID", "foobar");
+        @SuppressWarnings({"rawtypes"})
+        SolrRequest request = new QueryRequest(params);
+        request.setPath("/tasks/cancel");
+
+        NamedList<Object> queryResponse = null;

Review comment:
       Don't need to initialize with `null`

##########
File path: solr/core/src/test/org/apache/solr/search/TestTaskManagement.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.search;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.NamedList;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+public class TestTaskManagement extends SolrCloudTestCase {
+    private static final String COLLECTION_NAME = "collection1";
+
+    private ExecutorService executorService;
+
+    @BeforeClass
+    public static void setupCluster() throws Exception {
+        initCore("solrconfig.xml", "schema11.xml");
+
+        configureCluster(4)
+                .addConfig("conf", configset("sql"))
+                .configure();
+    }
+
+    @AfterClass
+    public static void tearDownCluster() throws Exception {
+        shutdownCluster();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setUp();
+
+        CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 1)
+                .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+                .process(cluster.getSolrClient());
+        cluster.waitForActiveCollection(COLLECTION_NAME, 2, 2);
+        cluster.getSolrClient().setDefaultCollection(COLLECTION_NAME);
+
+        cluster.getSolrClient().setDefaultCollection("collection1");
+
+        executorService = 
ExecutorUtil.newMDCAwareCachedThreadPool("TestTaskManagement");
+
+        List<SolrInputDocument> docs = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            SolrInputDocument doc = new SolrInputDocument();
+            doc.addField("id", i);
+            doc.addField("foo1_s", Integer.toString(i));
+            doc.addField("foo2_s", Boolean.toString(i % 2 == 0));
+            doc.addField("foo4_s", new BytesRef(Boolean.toString(i % 2 == 0)));
+
+            docs.add(doc);
+        }
+
+        cluster.getSolrClient().add(docs);
+        cluster.getSolrClient().commit();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        
CollectionAdminRequest.deleteCollection(COLLECTION_NAME).process(cluster.getSolrClient());
+        executorService.shutdown();
+        super.tearDown();
+    }
+
+    @Test
+    public void testNonExistentQuery() throws Exception {
+        ModifiableSolrParams params = new ModifiableSolrParams();
+
+        params.set("cancelUUID", "foobar");
+        @SuppressWarnings({"rawtypes"})
+        SolrRequest request = new QueryRequest(params);
+        request.setPath("/tasks/cancel");
+
+        NamedList<Object> queryResponse = null;
+
+        queryResponse = cluster.getSolrClient().request(request);
+
+        assertEquals("Query with queryID foobar not found", 
queryResponse.get("status"));
+    }
+
+    @Test
+    public void testCancellationQuery() throws Exception {
+        ConcurrentHashSet<Integer> queryIdsSet = new ConcurrentHashSet<>();
+        ConcurrentHashSet<Integer> notFoundIdsSet = new ConcurrentHashSet<>();
+
+        List<CompletableFuture<Void>> queryFutures = new ArrayList<>();
+
+        for (int i = 0; i < 100; i++) {
+            CompletableFuture<Void> future = 
executeQueryAsync(Integer.toString(i));
+
+            queryFutures.add(future);
+        }
+
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        for (int i = 0; i < 90; i++) {
+            CompletableFuture<Void> future = cancelQuery(Integer.toString(i), 
4000, queryIdsSet, notFoundIdsSet);
+
+            futures.add(future);
+        }
+
+        futures.forEach(CompletableFuture::join);
+
+        queryFutures.forEach(CompletableFuture::join);
+
+        assertTrue(queryIdsSet.size() + notFoundIdsSet.size() == 90);
+    }
+
+    @Test
+    public void testListCancellableQueries() throws Exception {
+        ModifiableSolrParams params = new ModifiableSolrParams();
+
+        @SuppressWarnings({"rawtypes"})
+        SolrRequest request = new QueryRequest(params);
+        request.setPath("/tasks/list");
+
+        for (int i = 0; i < 50; i++) {
+            executeQueryAsync(Integer.toString(i));
+        }
+
+        NamedList<Object> queryResponse = null;
+
+        queryResponse = cluster.getSolrClient().request(request);
+
+        @SuppressWarnings({"unchecked"})
+        NamedList<String> result = (NamedList<String>) 
queryResponse.get("taskList");
+
+        Iterator<Map.Entry<String, String>> iterator = result.iterator();
+
+        Set<Integer> presentQueryIDs = new HashSet<>();
+
+        while (iterator.hasNext()) {
+            Map.Entry<String, String> entry = iterator.next();
+
+            presentQueryIDs.add(Integer.parseInt(entry.getKey()));
+        }
+
+        assertTrue(presentQueryIDs.size() > 0 && presentQueryIDs.size() <= 50);
+
+        Iterator<Integer> integerIterator = presentQueryIDs.iterator();
+
+        while (integerIterator.hasNext()) {
+            int value = integerIterator.next();
+
+            assertTrue (value >= 0 && value < 50);
+        }
+    }
+
+    @Test
+    public void testCheckSpecificQueryStatus() throws Exception {
+        ModifiableSolrParams params = new ModifiableSolrParams();
+
+        params.set("taskUUID", "25");
+
+        @SuppressWarnings({"rawtypes"})
+        SolrRequest request = new QueryRequest(params);
+
+        request.setPath("/tasks/list");
+
+        NamedList<Object> queryResponse = 
cluster.getSolrClient().request(request);
+
+        @SuppressWarnings({"unchecked"})
+        String result = (String) queryResponse.get("taskStatus");
+
+        assertFalse(result.contains("true"));
+    }
+
+    private CompletableFuture<Void> cancelQuery(final String queryID, final 
int sleepTime, Set<Integer> cancelledQueryIdsSet,
+                                          Set<Integer> notFoundQueryIdSet) {
+        return CompletableFuture.runAsync(() -> {
+            ModifiableSolrParams params = new ModifiableSolrParams();
+
+            params.set("cancelUUID", queryID);
+            @SuppressWarnings({"rawtypes"})
+            SolrRequest request = new QueryRequest(params);
+            request.setPath("/tasks/cancel");
+
+            // Wait for some time to let the query start
+            try {
+                if (sleepTime > 0) {
+                    Thread.sleep(sleepTime);
+                }
+
+                try {
+                    NamedList<Object> queryResponse = null;

Review comment:
       Don't need to initialize w/ `null` here either.

##########
File path: 
solr/core/src/java/org/apache/solr/handler/component/QueryCancellationComponent.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import org.apache.lucene.search.CancellableTask;
+
+import java.io.IOException;
+
+/** Responsible for handling query cancellation requests */
+public class QueryCancellationComponent extends SearchComponent {
+    public static final String COMPONENT_NAME = "querycancellation";
+
+    private boolean shouldProcess;
+
+    @Override
+    public void prepare(ResponseBuilder rb) throws IOException
+    {
+        if (rb.isCancellation()) {
+            shouldProcess = true;
+        }
+    }
+
+    @Override
+    public void process(ResponseBuilder rb) {
+        if (!shouldProcess) {
+            return;
+        }
+
+        String cancellationUUID = rb.getCancellationUUID();
+
+        if (cancellationUUID == null) {
+            throw new RuntimeException("Null query UUID seen");
+        }
+
+        CancellableTask cancellableTask = 
rb.req.getCore().getCancellableQueryTracker().getCancellableTask(cancellationUUID);
+
+        if (cancellableTask != null) {
+            cancellableTask.cancelTask();
+            rb.rsp.add("cancellationResult", "success");
+        } else {
+            rb.rsp.add("cancellationResult", "not found");
+        }
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void handleResponses(ResponseBuilder rb, ShardRequest sreq) {
+        if (!shouldProcess) {
+            return;
+        }
+
+        boolean failureSeen = false;
+        boolean queryNotFound = false;
+
+        for (ShardResponse r : sreq.responses) {
+
+            String cancellationResult = (String) r.getSolrResponse()
+                    .getResponse().get("cancellationResult");
+
+            if (!cancellationResult.equalsIgnoreCase("success")) {
+                if (cancellationResult.equalsIgnoreCase("not found")) {
+                    queryNotFound = true;
+                } else {
+                    failureSeen = true;
+                }
+
+                break;
+            }
+        }
+
+        if (failureSeen) {
+            rb.rsp.getValues().add("status", "Query with queryID " + 
rb.getCancellationUUID() +
+                    " could not be cancelled successfully");
+        } else if (queryNotFound) {
+            rb.rsp.getValues().add("status", "Query with queryID " + 
rb.getCancellationUUID() +
+                    " not found");
+        } else {
+            rb.rsp.getValues().add("status", "Query with queryID " + 
rb.getCancellationUUID() +
+                    " cancelled successfully");
+        }
+    }
+
+    @Override
+    public String getDescription() {
+        return "querycancellation";

Review comment:
       Do you want to add more to the description? Whatever be the case, it'd 
be good to be consistent between here and in `ActiveTasksListComponent.java`

##########
File path: solr/core/src/test/org/apache/solr/search/TestTaskManagement.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.search;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.NamedList;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+public class TestTaskManagement extends SolrCloudTestCase {
+    private static final String COLLECTION_NAME = "collection1";
+
+    private ExecutorService executorService;
+
+    @BeforeClass
+    public static void setupCluster() throws Exception {
+        initCore("solrconfig.xml", "schema11.xml");
+
+        configureCluster(4)
+                .addConfig("conf", configset("sql"))
+                .configure();
+    }
+
+    @AfterClass
+    public static void tearDownCluster() throws Exception {
+        shutdownCluster();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setUp();
+
+        CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 1)
+                .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+                .process(cluster.getSolrClient());
+        cluster.waitForActiveCollection(COLLECTION_NAME, 2, 2);
+        cluster.getSolrClient().setDefaultCollection(COLLECTION_NAME);
+
+        cluster.getSolrClient().setDefaultCollection("collection1");
+
+        executorService = 
ExecutorUtil.newMDCAwareCachedThreadPool("TestTaskManagement");
+
+        List<SolrInputDocument> docs = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            SolrInputDocument doc = new SolrInputDocument();
+            doc.addField("id", i);
+            doc.addField("foo1_s", Integer.toString(i));
+            doc.addField("foo2_s", Boolean.toString(i % 2 == 0));
+            doc.addField("foo4_s", new BytesRef(Boolean.toString(i % 2 == 0)));
+
+            docs.add(doc);
+        }
+
+        cluster.getSolrClient().add(docs);
+        cluster.getSolrClient().commit();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        
CollectionAdminRequest.deleteCollection(COLLECTION_NAME).process(cluster.getSolrClient());
+        executorService.shutdown();
+        super.tearDown();
+    }
+
+    @Test
+    public void testNonExistentQuery() throws Exception {
+        ModifiableSolrParams params = new ModifiableSolrParams();
+
+        params.set("cancelUUID", "foobar");
+        @SuppressWarnings({"rawtypes"})
+        SolrRequest request = new QueryRequest(params);
+        request.setPath("/tasks/cancel");
+
+        NamedList<Object> queryResponse = null;
+
+        queryResponse = cluster.getSolrClient().request(request);
+
+        assertEquals("Query with queryID foobar not found", 
queryResponse.get("status"));
+    }
+
+    @Test
+    public void testCancellationQuery() throws Exception {
+        ConcurrentHashSet<Integer> queryIdsSet = new ConcurrentHashSet<>();
+        ConcurrentHashSet<Integer> notFoundIdsSet = new ConcurrentHashSet<>();
+
+        List<CompletableFuture<Void>> queryFutures = new ArrayList<>();
+
+        for (int i = 0; i < 100; i++) {
+            CompletableFuture<Void> future = 
executeQueryAsync(Integer.toString(i));
+
+            queryFutures.add(future);
+        }
+
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        for (int i = 0; i < 90; i++) {
+            CompletableFuture<Void> future = cancelQuery(Integer.toString(i), 
4000, queryIdsSet, notFoundIdsSet);
+
+            futures.add(future);
+        }
+
+        futures.forEach(CompletableFuture::join);
+
+        queryFutures.forEach(CompletableFuture::join);
+
+        assertTrue(queryIdsSet.size() + notFoundIdsSet.size() == 90);
+    }
+
+    @Test
+    public void testListCancellableQueries() throws Exception {
+        ModifiableSolrParams params = new ModifiableSolrParams();
+
+        @SuppressWarnings({"rawtypes"})
+        SolrRequest request = new QueryRequest(params);
+        request.setPath("/tasks/list");
+
+        for (int i = 0; i < 50; i++) {
+            executeQueryAsync(Integer.toString(i));
+        }
+
+        NamedList<Object> queryResponse = null;

Review comment:
       Don't need to initialize as `null`

##########
File path: 
lucene/core/src/java/org/apache/lucene/search/CancellableCollector.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.search;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.lucene.index.LeafReaderContext;
+
+/** Allows a query to be cancelled */
+public class CancellableCollector implements Collector, CancellableTask {
+
+  /** Thrown when a query gets cancelled */
+  public static class QueryCancelledException extends RuntimeException {}

Review comment:
       Or just reuse that?

##########
File path: 
lucene/core/src/test/org/apache/lucene/search/TestCancellableCollector.java
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.search;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.SortedDocValuesField;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.RandomIndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.NamedThreadFactory;
+
+public class TestCancellableCollector extends LuceneTestCase {
+  Directory dir;
+  IndexReader reader;
+  IndexSearcher searcher;
+  ExecutorService executor = null;
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    dir = newDirectory();
+
+    RandomIndexWriter iw = new RandomIndexWriter(random(), dir);
+    Random random = random();
+    for (int i = 0; i < 100; i++) {
+      Document doc = new Document();
+      doc.add(newStringField("field", Integer.toString(i), Field.Store.NO));
+      doc.add(newStringField("field2", Boolean.toString(i % 2 == 0), 
Field.Store.NO));
+      doc.add(new SortedDocValuesField("field2", new 
BytesRef(Boolean.toString(i % 2 == 0))));
+      iw.addDocument(doc);
+
+      if (random.nextBoolean()) {
+        iw.commit();
+      }
+    }
+    reader = iw.getReader();
+    iw.close();
+
+    searcher = new IndexSearcher(reader);
+
+    executor =
+        new ThreadPoolExecutor(
+            4,
+            4,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<Runnable>(),
+            new NamedThreadFactory("TestIndexSearcher"));
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+    reader.close();
+    dir.close();
+
+    if (executor != null) {
+      executor.shutdown();
+    }
+
+    executor = null;
+  }
+
+  private CancellableCollector buildCancellableCollector(
+      final int numHits, boolean delayStart, boolean delayCollection) {
+    TopScoreDocCollector topScoreDocCollector = 
TopScoreDocCollector.create(numHits, null, 1);
+    CancellableCollector collector = new 
CancellableCollector(topScoreDocCollector);
+
+    DummyCancellableCollector dummyCancellableCollector =
+        new DummyCancellableCollector(collector, delayStart, delayCollection);
+
+    return dummyCancellableCollector;
+  }
+
+  private void executeSearchTest(
+      IndexSearcher searcher, Query query, CancellableCollector 
cancellableCollector, int numHits)
+      throws Exception {
+    TopDocs topDocs = searcher.search(query, numHits);
+
+    searcher.search(query, cancellableCollector);
+
+    CancellableCollector internalCancellableCollector =
+        (CancellableCollector) cancellableCollector.getInternalCollector();
+    TopScoreDocCollector topScoreDocCollector =
+        (TopScoreDocCollector) 
internalCancellableCollector.getInternalCollector();
+
+    assertEquals(topDocs.totalHits.value, topScoreDocCollector.totalHits);
+  }
+
+  private void cancelQuery(CancellableCollector cancellableCollector, final 
int sleepTime) {
+    executor.submit(
+        () -> {
+
+          // Wait for some time to let the query start
+          try {
+            if (sleepTime > 0) {

Review comment:
       The value of sleepTime being passed is also always 0. Not sure if you 
were planning to add something here.

##########
File path: 
solr/core/src/java/org/apache/solr/handler/component/ActiveTasksListComponent.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.util.NamedList;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import static java.util.Arrays.asList;
+import static org.apache.solr.common.util.Utils.fromJSONString;
+
+/** List the active tasks that can be cancelled */
+public class ActiveTasksListComponent extends SearchComponent {
+    public static final String COMPONENT_NAME = "activetaskslistcomponent";
+
+    private boolean shouldProcess;
+
+    @Override
+    public void prepare(ResponseBuilder rb) throws IOException {
+        if (rb.isTaskListRequest()) {
+            shouldProcess = true;
+        }
+    }
+
+    @Override
+    public void process(ResponseBuilder rb) {
+        if (!shouldProcess) {
+            return;
+        }
+
+        if (rb.getTaskStatusCheckUUID() != null) {
+            boolean isActiveOnThisShard = 
rb.req.getCore().getCancellableQueryTracker().isQueryIdActive(rb.getTaskStatusCheckUUID());
+
+            rb.rsp.add("taskStatus", isActiveOnThisShard);
+            return;
+        }
+
+        rb.rsp.add("taskList", (MapWriter) ew -> {
+            Iterator<Map.Entry<String, String>> iterator = 
rb.req.getCore().getCancellableQueryTracker().getActiveQueriesGenerated();
+
+            while (iterator.hasNext()) {
+                Map.Entry<String, String> entry = iterator.next();
+                ew.put(entry.getKey(), entry.getValue());
+            }
+        });
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void handleResponses(ResponseBuilder rb, ShardRequest sreq) {
+        if (!shouldProcess) {
+            return;
+        }
+
+        NamedList<String> resultList = new NamedList<>();
+
+        for (ShardResponse r : sreq.responses) {
+
+            if (rb.getTaskStatusCheckUUID() != null) {
+                boolean isTaskActiveOnShard = (boolean) 
r.getSolrResponse().getResponse().get("taskStatus");
+
+                if (isTaskActiveOnShard == true) {
+                    rb.rsp.getValues().add("taskStatus", 
rb.getTaskStatusCheckUUID() + ":" + isTaskActiveOnShard);

Review comment:
       +1

##########
File path: 
solr/core/src/java/org/apache/solr/handler/component/TaskManagementHandler.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.RequestHandlerBase;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.security.PermissionNameProvider;
+import org.apache.solr.util.plugin.SolrCoreAware;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.solr.common.params.CommonParams.DISTRIB;
+import static org.apache.solr.common.params.CommonParams.PATH;
+
+/**
+ * Abstract class which serves as the root of all task managing handlers
+ */
+public abstract class TaskManagementHandler extends RequestHandlerBase 
implements SolrCoreAware, PermissionNameProvider {
+    private ShardHandlerFactory shardHandlerFactory;
+
+    @Override
+    public void inform(SolrCore core) {
+        this.shardHandlerFactory = 
core.getCoreContainer().getShardHandlerFactory();
+    }
+
+    /**
+     * Process the actual request.
+     * extraParams is required for allowing sub handlers to pass in custom 
parameters to be put in the
+     * outgoing shard request
+     */
+    protected void processRequest(SolrQueryRequest req, ResponseBuilder rb, 
Map<String, String> extraParams) throws IOException {
+        ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+        List<SearchComponent> components = rb.components;
+
+        shardHandler.prepDistributed(rb);
+
+        for(SearchComponent c : components) {
+            c.prepare(rb);
+        }
+
+        if (!rb.isDistrib) {
+            for (SearchComponent component : components) {
+                component.process(rb);
+            }
+        } else {
+            ShardRequest sreq = new ShardRequest();
+
+            // Distribute to all shards
+            sreq.shards = rb.shards;
+            sreq.actualShards = sreq.shards;
+
+            sreq.responses = new ArrayList<>(sreq.actualShards.length);
+            rb.finished = new ArrayList<>();
+
+            for (String shard : sreq.actualShards) {
+                ModifiableSolrParams params = new 
ModifiableSolrParams(sreq.params);
+                String reqPath = (String) req.getContext().get(PATH);
+
+                params.set(CommonParams.QT, reqPath);
+                params.remove(ShardParams.SHARDS);      // not a top-level 
request
+                params.set(DISTRIB, "false");               // not a top-level 
request
+                params.remove("indent");
+                params.remove(CommonParams.HEADER_ECHO_PARAMS);
+                params.set(ShardParams.IS_SHARD, true);  // a sub (shard) 
request
+                params.set(ShardParams.SHARDS_PURPOSE, sreq.purpose);
+                params.set(ShardParams.SHARD_URL, shard); // so the shard 
knows what was asked
+                params.set(CommonParams.OMIT_HEADER, false);

Review comment:
       Can you reuse the code from SearchHandler ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to