madrob commented on a change in pull request #2403:
URL: https://github.com/apache/lucene-solr/pull/2403#discussion_r595245360



##########
File path: 
solr/core/src/java/org/apache/solr/handler/component/QueryCancellationComponent.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import org.apache.solr.client.solrj.util.Cancellable;
+
+import java.io.IOException;
+
+/** Responsible for handling query cancellation requests */
+public class QueryCancellationComponent extends SearchComponent {
+    public static final String COMPONENT_NAME = "querycancellation";
+
+    private boolean shouldProcess;
+
+    @Override
+    public void prepare(ResponseBuilder rb) throws IOException
+    {
+        if (rb.isCancellation()) {
+            shouldProcess = true;
+        }
+    }
+
+    @Override
+    public void process(ResponseBuilder rb) {
+        if (!shouldProcess) {
+            return;
+        }
+
+        String cancellationUUID = rb.getCancellationUUID();
+
+        if (cancellationUUID == null) {
+            throw new RuntimeException("Null query UUID seen");
+        }
+
+        Cancellable cancellableTask = 
rb.req.getCore().getCancellableQueryTracker().getCancellableTask(cancellationUUID);
+
+        if (cancellableTask != null) {
+            cancellableTask.cancel();
+            rb.rsp.add("cancellationResult", "success");
+        } else {
+            rb.rsp.add("cancellationResult", "not found");
+        }
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void handleResponses(ResponseBuilder rb, ShardRequest sreq) {
+        if (!shouldProcess) {
+            return;
+        }
+
+        boolean queryFound = false;
+
+        for (ShardResponse r : sreq.responses) {
+
+            String cancellationResult = (String) r.getSolrResponse()
+                    .getResponse().get("cancellationResult");
+
+            if (cancellationResult.equalsIgnoreCase("success")) {
+                queryFound = true;
+
+                break;
+            }
+        }
+
+        // If any shard sees the query as present, then we mark the query as 
successfully cancelled. If no shard found
+        // the query, then that can denote that the query was not found. This 
is important since the query cancellation
+        // request is broadcast to all shards, and the query might have 
completed on some shards but not on others
+
+        if(queryFound) {
+            rb.rsp.getValues().add("status", "Query with queryID " + 
rb.getCancellationUUID() +
+                    " cancelled successfully");
+            rb.rsp.getValues().add("responseCode", 200 /* HTTP OK */);
+        } else {
+            rb.rsp.getValues().add("status", "Query with queryID " + 
rb.getCancellationUUID() +
+                    " not found");
+            rb.rsp.getValues().add("responseCode", 401 /* HTTP NOT FOUND */);

Review comment:
       Do you mean 404?

##########
File path: solr/core/src/java/org/apache/solr/search/CancellableCollector.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.search;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.FilterLeafCollector;
+import org.apache.lucene.search.LeafCollector;
+import org.apache.lucene.search.ScoreMode;
+import org.apache.solr.client.solrj.util.Cancellable;
+
+/** Allows a query to be cancelled */
+public class CancellableCollector implements Collector, Cancellable {
+
+  /** Thrown when a query gets cancelled */
+  public static class QueryCancelledException extends RuntimeException {}
+
+  private final Collector collector;
+  private final AtomicBoolean isQueryCancelled;
+
+  public CancellableCollector(Collector collector) {
+    Objects.requireNonNull(collector, "Internal collector not provided but 
wrapper collector accessed");
+
+    this.collector = collector;
+    this.isQueryCancelled = new AtomicBoolean();
+  }
+
+  @Override
+  public LeafCollector getLeafCollector(LeafReaderContext context) throws 
IOException {
+
+    if (isQueryCancelled.compareAndSet(true, true)) {

Review comment:
       if it's already true, set it to true...?

##########
File path: solr/core/src/test/org/apache/solr/search/TestTaskManagement.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.search;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.NamedList;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+public class TestTaskManagement extends SolrCloudTestCase {
+    private static final String COLLECTION_NAME = "collection1";
+
+    private ExecutorService executorService;
+
+    @BeforeClass
+    public static void setupCluster() throws Exception {
+        initCore("solrconfig.xml", "schema11.xml");
+
+        configureCluster(4)
+                .addConfig("conf", configset("sql"))
+                .configure();
+    }
+
+    @AfterClass
+    public static void tearDownCluster() throws Exception {
+        shutdownCluster();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setUp();
+
+        CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 1)
+                .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+                .process(cluster.getSolrClient());
+        cluster.waitForActiveCollection(COLLECTION_NAME, 2, 2);
+        cluster.getSolrClient().setDefaultCollection(COLLECTION_NAME);
+
+        cluster.getSolrClient().setDefaultCollection("collection1");
+
+        executorService = 
ExecutorUtil.newMDCAwareCachedThreadPool("TestTaskManagement");
+
+        List<SolrInputDocument> docs = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            SolrInputDocument doc = new SolrInputDocument();
+            doc.addField("id", i);
+            doc.addField("foo1_s", Integer.toString(i));
+            doc.addField("foo2_s", Boolean.toString(i % 2 == 0));
+            doc.addField("foo4_s", new BytesRef(Boolean.toString(i % 2 == 0)));
+
+            docs.add(doc);
+        }
+
+        cluster.getSolrClient().add(docs);
+        cluster.getSolrClient().commit();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        
CollectionAdminRequest.deleteCollection(COLLECTION_NAME).process(cluster.getSolrClient());
+        executorService.shutdown();
+        super.tearDown();
+    }
+
+    @Test
+    public void testNonExistentQuery() throws Exception {
+        ModifiableSolrParams params = new ModifiableSolrParams();
+
+        params.set("queryUUID", "foobar");
+        @SuppressWarnings({"rawtypes"})
+        SolrRequest request = new QueryRequest(params);
+        request.setPath("/tasks/cancel");
+
+        NamedList<Object> queryResponse;
+
+        queryResponse = cluster.getSolrClient().request(request);
+
+        assertEquals("Query with queryID foobar not found", 
queryResponse.get("status"));

Review comment:
       Should test on status code as well or instead.

##########
File path: solr/solr-ref-guide/src/task-management.adoc
##########
@@ -0,0 +1,66 @@
+= Task Management
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+Solr allows users to control their running tasks by monitoring them, 
specifying tasks as cancellation enabled and allowing
+cancellation of the same.
+
+This is achieved using the task management interface. Currently, this is 
supported for queries.
+
+== Types of Operations
+Task management interface supports the following types of operations:
+
+1. List all currently running cancellable tasks.
+2. Cancel a specific task.
+3. Query the status of a specific task.
+
+== Listing All Active Cancellable Tasks
+To list all the active cancellable tasks currently running, please use the 
following syntax:
+
+`\http://localhost:8983/solr/tasks/list`
+
+==== Sample Response
+
+`{responseHeader={status=0, QTime=11370}, 
taskList={0=q=*%3A*&canCancel=true&queryUUID=0&_stateVer_=collection1%3A4&wt=javabin&version=2,
 
5=q=*%3A*&canCancel=true&queryUUID=5&_stateVer_=collection1%3A4&wt=javabin&version=2,
 
7=q=*%3A*&canCancel=true&queryUUID=7&_stateVer_=collection1%3A4&wt=javabin&version=2}`
+
+== Cancelling An Active Cancellable Task
+To cancel an active task, please use the following syntax:
+
+`\http://localhost:8983/solr/tasks/cancel?queryUUID=foobar`
+
+==== Sample Response
+===== If the task UUID was found and successfully cancelled:
+
+`{responseHeader={status=0, QTime=39}, status=Query with queryID 85 cancelled 
successfully}`
+
+===== If the task UUID was not found
+
+`{responseHeader={status=0, QTime=39}, status=Query with queryID 85 not found}`
+
+== Check Status of a Specific Task
+To check the status of a specific task, please use the following syntax:
+
+`\http://localhost:8983/solr/tasks/list?taskUUID=foobar`

Review comment:
       Can we specify multiple tasks in a single query?

##########
File path: solr/solr-ref-guide/src/task-management.adoc
##########
@@ -0,0 +1,73 @@
+= Task Management
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+Solr allows users to control their running tasks by monitoring them, 
specifying tasks as cancellation enabled and allowing
+cancellation of the same.
+
+This is achieved using the task management interface. Currently, this is 
supported for queries.
+
+== Types of Operations
+Task management interface (TMI) supports the following types of operations:
+
+1. List all currently running cancellable tasks.
+2. Cancel a specific task.
+3. Query the status of a specific task.
+
+== Listing All Active Cancellable Tasks
+To list all the active cancellable tasks currently running, please use the 
following syntax:
+
+`\http://localhost:8983/solr/tasks/list`
+
+==== Sample Response
+
+`{responseHeader={status=0, QTime=11370}, 
taskList={0=q=*%3A*&canCancel=true&queryUUID=0&_stateVer_=collection1%3A4&wt=javabin&version=2,
 
5=q=*%3A*&canCancel=true&queryUUID=5&_stateVer_=collection1%3A4&wt=javabin&version=2,
 
7=q=*%3A*&canCancel=true&queryUUID=7&_stateVer_=collection1%3A4&wt=javabin&version=2}`
+
+== Cancelling An Active Cancellable Task
+To cancel an active task, please use the following syntax:
+
+`\http://localhost:8983/solr/tasks/cancel?cancelUUID=foobar`
+
+==== cancelUUID Parameter
+This parameter is used to specify the UUID of the task to be cancelled.
+
+==== Sample Response
+===== If the task UUID was found and successfully cancelled:
+
+`{responseHeader={status=0, QTime=39}, status=Query with queryID 85 cancelled 
successfully}`
+
+===== If the task UUID was not found
+
+`{responseHeader={status=0, QTime=39}, status=Query with queryID 85 not found}`
+
+===== If the cancellation failed
+
+`{responseHeader={status=0, QTime=39}, status=Query with queryID 85 could not 
be cancelled successfully}`
+
+== Check Status of a Specific Task
+To check the status of a specific task, please use the following syntax:
+
+`\http://localhost:8983/solr/tasks/list?taskUUID=foobar`
+
+==== taskUUID Parameter
+`taskUUID` parameter can be used to specify a task UUID whose status can be 
checked.
+
+==== Sample Response
+`{responseHeader={status=0, QTime=6128}, taskStatus=foobar:true}`

Review comment:
       Right, and when checking for existence `taskStatus=foobar:true` does not 
feel meaningful. Maybe something a little more verbose: `taskStatus={id:foobar, 
status:active}` would be easier to understand.

##########
File path: solr/core/src/test/org/apache/solr/search/TestTaskManagement.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.search;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.NamedList;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+public class TestTaskManagement extends SolrCloudTestCase {
+    private static final String COLLECTION_NAME = "collection1";
+
+    private ExecutorService executorService;
+
+    @BeforeClass
+    public static void setupCluster() throws Exception {
+        initCore("solrconfig.xml", "schema11.xml");
+
+        configureCluster(4)
+                .addConfig("conf", configset("sql"))
+                .configure();
+    }
+
+    @AfterClass
+    public static void tearDownCluster() throws Exception {
+        shutdownCluster();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setUp();
+
+        CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 1)
+                .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+                .process(cluster.getSolrClient());
+        cluster.waitForActiveCollection(COLLECTION_NAME, 2, 2);
+        cluster.getSolrClient().setDefaultCollection(COLLECTION_NAME);
+
+        cluster.getSolrClient().setDefaultCollection("collection1");
+
+        executorService = 
ExecutorUtil.newMDCAwareCachedThreadPool("TestTaskManagement");
+
+        List<SolrInputDocument> docs = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            SolrInputDocument doc = new SolrInputDocument();
+            doc.addField("id", i);
+            doc.addField("foo1_s", Integer.toString(i));
+            doc.addField("foo2_s", Boolean.toString(i % 2 == 0));
+            doc.addField("foo4_s", new BytesRef(Boolean.toString(i % 2 == 0)));
+
+            docs.add(doc);
+        }
+
+        cluster.getSolrClient().add(docs);
+        cluster.getSolrClient().commit();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        
CollectionAdminRequest.deleteCollection(COLLECTION_NAME).process(cluster.getSolrClient());
+        executorService.shutdown();
+        super.tearDown();
+    }
+
+    @Test
+    public void testNonExistentQuery() throws Exception {
+        ModifiableSolrParams params = new ModifiableSolrParams();
+
+        params.set("queryUUID", "foobar");
+        @SuppressWarnings({"rawtypes"})
+        SolrRequest request = new QueryRequest(params);
+        request.setPath("/tasks/cancel");
+
+        NamedList<Object> queryResponse;
+
+        queryResponse = cluster.getSolrClient().request(request);
+
+        assertEquals("Query with queryID foobar not found", 
queryResponse.get("status"));
+    }
+
+    @Test
+    public void testCancellationQuery() {
+        Set<Integer> queryIdsSet = ConcurrentHashMap.newKeySet();
+        Set<Integer> notFoundIdsSet = ConcurrentHashMap.newKeySet();
+
+        List<CompletableFuture<Void>> queryFutures = new ArrayList<>();
+
+        for (int i = 0; i < 100; i++) {
+            CompletableFuture<Void> future = 
executeQueryAsync(Integer.toString(i));
+
+            queryFutures.add(future);
+        }
+
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        for (int i = 0; i < 90; i++) {
+            CompletableFuture<Void> future = cancelQuery(Integer.toString(i), 
4000, queryIdsSet, notFoundIdsSet);
+
+            futures.add(future);
+        }
+
+        futures.forEach(CompletableFuture::join);
+
+        queryFutures.forEach(CompletableFuture::join);
+
+        assertEquals("Total query count did not match the expected value",
+                queryIdsSet.size() + notFoundIdsSet.size(), 90);
+    }
+
+    @Test
+    public void testListCancellableQueries() throws Exception {
+        ModifiableSolrParams params = new ModifiableSolrParams();
+
+        @SuppressWarnings({"rawtypes"})
+        SolrRequest request = new QueryRequest(params);
+        request.setPath("/tasks/list");
+
+        for (int i = 0; i < 50; i++) {
+            executeQueryAsync(Integer.toString(i));
+        }
+
+        NamedList<Object> queryResponse;
+
+        queryResponse = cluster.getSolrClient().request(request);
+
+        @SuppressWarnings({"unchecked"})
+        NamedList<String> result = (NamedList<String>) 
queryResponse.get("taskList");
+
+        Iterator<Map.Entry<String, String>> iterator = result.iterator();
+
+        Set<Integer> presentQueryIDs = new HashSet<>();
+
+        while (iterator.hasNext()) {
+            Map.Entry<String, String> entry = iterator.next();
+
+            presentQueryIDs.add(Integer.parseInt(entry.getKey()));
+        }
+
+        assertTrue(presentQueryIDs.size() > 0 && presentQueryIDs.size() <= 50);
+
+        for (int value : presentQueryIDs) {
+            assertTrue(value >= 0 && value < 50);
+        }
+    }
+
+    @Test
+    public void testCheckSpecificQueryStatus() throws Exception {
+        ModifiableSolrParams params = new ModifiableSolrParams();
+
+        params.set("taskUUID", "25");
+
+        @SuppressWarnings({"rawtypes"})
+        SolrRequest request = new QueryRequest(params);
+
+        request.setPath("/tasks/list");
+
+        NamedList<Object> queryResponse = 
cluster.getSolrClient().request(request);
+
+        @SuppressWarnings({"unchecked"})
+        String result = (String) queryResponse.get("taskStatus");
+
+        assertFalse(result.contains("true"));
+    }
+
+    private CompletableFuture<Void> cancelQuery(final String queryID, final 
int sleepTime, Set<Integer> cancelledQueryIdsSet,
+                                          Set<Integer> notFoundQueryIdSet) {
+        return CompletableFuture.runAsync(() -> {
+            ModifiableSolrParams params = new ModifiableSolrParams();
+
+            params.set("queryUUID", queryID);
+            @SuppressWarnings({"rawtypes"})
+            SolrRequest request = new QueryRequest(params);
+            request.setPath("/tasks/cancel");
+
+            // Wait for some time to let the query start
+            try {
+                if (sleepTime > 0) {
+                    Thread.sleep(sleepTime);
+                }
+
+                try {
+                    NamedList<Object> queryResponse;
+
+                    queryResponse = cluster.getSolrClient().request(request);
+
+                    int responseCode = (int) queryResponse.get("responseCode");
+
+                    if (responseCode == 200 /* HTTP OK */) {
+                        cancelledQueryIdsSet.add(Integer.parseInt(queryID));
+                    } else if (responseCode == 401 /* HTTP NOT FOUND */) {
+                        notFoundQueryIdSet.add(Integer.parseInt(queryID));
+                    }
+                } catch (Exception e) {
+                    throw new RuntimeException(e.getMessage());
+                }
+            } catch (InterruptedException e) {

Review comment:
       restore interrupted status

##########
File path: solr/core/src/test/org/apache/solr/search/TestTaskManagement.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.search;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.NamedList;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+public class TestTaskManagement extends SolrCloudTestCase {
+    private static final String COLLECTION_NAME = "collection1";
+
+    private ExecutorService executorService;
+
+    @BeforeClass
+    public static void setupCluster() throws Exception {
+        initCore("solrconfig.xml", "schema11.xml");
+
+        configureCluster(4)
+                .addConfig("conf", configset("sql"))
+                .configure();
+    }
+
+    @AfterClass
+    public static void tearDownCluster() throws Exception {
+        shutdownCluster();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setUp();
+
+        CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 1)
+                .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+                .process(cluster.getSolrClient());
+        cluster.waitForActiveCollection(COLLECTION_NAME, 2, 2);
+        cluster.getSolrClient().setDefaultCollection(COLLECTION_NAME);
+
+        cluster.getSolrClient().setDefaultCollection("collection1");
+
+        executorService = 
ExecutorUtil.newMDCAwareCachedThreadPool("TestTaskManagement");
+
+        List<SolrInputDocument> docs = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            SolrInputDocument doc = new SolrInputDocument();
+            doc.addField("id", i);
+            doc.addField("foo1_s", Integer.toString(i));
+            doc.addField("foo2_s", Boolean.toString(i % 2 == 0));
+            doc.addField("foo4_s", new BytesRef(Boolean.toString(i % 2 == 0)));
+
+            docs.add(doc);
+        }
+
+        cluster.getSolrClient().add(docs);
+        cluster.getSolrClient().commit();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        
CollectionAdminRequest.deleteCollection(COLLECTION_NAME).process(cluster.getSolrClient());
+        executorService.shutdown();
+        super.tearDown();
+    }
+
+    @Test
+    public void testNonExistentQuery() throws Exception {
+        ModifiableSolrParams params = new ModifiableSolrParams();
+
+        params.set("queryUUID", "foobar");
+        @SuppressWarnings({"rawtypes"})
+        SolrRequest request = new QueryRequest(params);
+        request.setPath("/tasks/cancel");
+
+        NamedList<Object> queryResponse;
+
+        queryResponse = cluster.getSolrClient().request(request);
+
+        assertEquals("Query with queryID foobar not found", 
queryResponse.get("status"));
+    }
+
+    @Test
+    public void testCancellationQuery() {
+        Set<Integer> queryIdsSet = ConcurrentHashMap.newKeySet();
+        Set<Integer> notFoundIdsSet = ConcurrentHashMap.newKeySet();
+
+        List<CompletableFuture<Void>> queryFutures = new ArrayList<>();
+
+        for (int i = 0; i < 100; i++) {
+            CompletableFuture<Void> future = 
executeQueryAsync(Integer.toString(i));
+
+            queryFutures.add(future);
+        }
+
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        for (int i = 0; i < 90; i++) {
+            CompletableFuture<Void> future = cancelQuery(Integer.toString(i), 
4000, queryIdsSet, notFoundIdsSet);
+
+            futures.add(future);
+        }
+
+        futures.forEach(CompletableFuture::join);
+
+        queryFutures.forEach(CompletableFuture::join);
+
+        assertEquals("Total query count did not match the expected value",
+                queryIdsSet.size() + notFoundIdsSet.size(), 90);
+    }
+
+    @Test
+    public void testListCancellableQueries() throws Exception {
+        ModifiableSolrParams params = new ModifiableSolrParams();
+
+        @SuppressWarnings({"rawtypes"})
+        SolrRequest request = new QueryRequest(params);
+        request.setPath("/tasks/list");
+
+        for (int i = 0; i < 50; i++) {
+            executeQueryAsync(Integer.toString(i));
+        }
+
+        NamedList<Object> queryResponse;
+
+        queryResponse = cluster.getSolrClient().request(request);
+
+        @SuppressWarnings({"unchecked"})
+        NamedList<String> result = (NamedList<String>) 
queryResponse.get("taskList");
+
+        Iterator<Map.Entry<String, String>> iterator = result.iterator();
+
+        Set<Integer> presentQueryIDs = new HashSet<>();
+
+        while (iterator.hasNext()) {
+            Map.Entry<String, String> entry = iterator.next();
+
+            presentQueryIDs.add(Integer.parseInt(entry.getKey()));
+        }
+
+        assertTrue(presentQueryIDs.size() > 0 && presentQueryIDs.size() <= 50);
+
+        for (int value : presentQueryIDs) {
+            assertTrue(value >= 0 && value < 50);
+        }
+    }
+
+    @Test
+    public void testCheckSpecificQueryStatus() throws Exception {
+        ModifiableSolrParams params = new ModifiableSolrParams();
+
+        params.set("taskUUID", "25");
+
+        @SuppressWarnings({"rawtypes"})
+        SolrRequest request = new QueryRequest(params);
+
+        request.setPath("/tasks/list");
+
+        NamedList<Object> queryResponse = 
cluster.getSolrClient().request(request);
+
+        @SuppressWarnings({"unchecked"})
+        String result = (String) queryResponse.get("taskStatus");
+
+        assertFalse(result.contains("true"));
+    }
+
+    private CompletableFuture<Void> cancelQuery(final String queryID, final 
int sleepTime, Set<Integer> cancelledQueryIdsSet,
+                                          Set<Integer> notFoundQueryIdSet) {
+        return CompletableFuture.runAsync(() -> {
+            ModifiableSolrParams params = new ModifiableSolrParams();
+
+            params.set("queryUUID", queryID);
+            @SuppressWarnings({"rawtypes"})
+            SolrRequest request = new QueryRequest(params);
+            request.setPath("/tasks/cancel");
+
+            // Wait for some time to let the query start
+            try {
+                if (sleepTime > 0) {
+                    Thread.sleep(sleepTime);
+                }
+
+                try {
+                    NamedList<Object> queryResponse;
+
+                    queryResponse = cluster.getSolrClient().request(request);
+
+                    int responseCode = (int) queryResponse.get("responseCode");
+
+                    if (responseCode == 200 /* HTTP OK */) {
+                        cancelledQueryIdsSet.add(Integer.parseInt(queryID));
+                    } else if (responseCode == 401 /* HTTP NOT FOUND */) {
+                        notFoundQueryIdSet.add(Integer.parseInt(queryID));
+                    }
+                } catch (Exception e) {
+                    throw new RuntimeException(e.getMessage());

Review comment:
       `throw new CompletionException(e);`

##########
File path: solr/core/src/test/org/apache/solr/search/TestTaskManagement.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.search;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.NamedList;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+public class TestTaskManagement extends SolrCloudTestCase {
+    private static final String COLLECTION_NAME = "collection1";
+
+    private ExecutorService executorService;
+
+    @BeforeClass
+    public static void setupCluster() throws Exception {
+        initCore("solrconfig.xml", "schema11.xml");
+
+        configureCluster(4)
+                .addConfig("conf", configset("sql"))
+                .configure();
+    }
+
+    @AfterClass
+    public static void tearDownCluster() throws Exception {
+        shutdownCluster();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setUp();
+
+        CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 1)
+                .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+                .process(cluster.getSolrClient());
+        cluster.waitForActiveCollection(COLLECTION_NAME, 2, 2);
+        cluster.getSolrClient().setDefaultCollection(COLLECTION_NAME);
+
+        cluster.getSolrClient().setDefaultCollection("collection1");
+
+        executorService = 
ExecutorUtil.newMDCAwareCachedThreadPool("TestTaskManagement");
+
+        List<SolrInputDocument> docs = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            SolrInputDocument doc = new SolrInputDocument();
+            doc.addField("id", i);
+            doc.addField("foo1_s", Integer.toString(i));
+            doc.addField("foo2_s", Boolean.toString(i % 2 == 0));
+            doc.addField("foo4_s", new BytesRef(Boolean.toString(i % 2 == 0)));
+
+            docs.add(doc);
+        }
+
+        cluster.getSolrClient().add(docs);
+        cluster.getSolrClient().commit();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        
CollectionAdminRequest.deleteCollection(COLLECTION_NAME).process(cluster.getSolrClient());
+        executorService.shutdown();
+        super.tearDown();
+    }
+
+    @Test
+    public void testNonExistentQuery() throws Exception {
+        ModifiableSolrParams params = new ModifiableSolrParams();
+
+        params.set("queryUUID", "foobar");
+        @SuppressWarnings({"rawtypes"})
+        SolrRequest request = new QueryRequest(params);
+        request.setPath("/tasks/cancel");
+
+        NamedList<Object> queryResponse;
+
+        queryResponse = cluster.getSolrClient().request(request);
+
+        assertEquals("Query with queryID foobar not found", 
queryResponse.get("status"));
+    }
+
+    @Test
+    public void testCancellationQuery() {
+        Set<Integer> queryIdsSet = ConcurrentHashMap.newKeySet();
+        Set<Integer> notFoundIdsSet = ConcurrentHashMap.newKeySet();
+
+        List<CompletableFuture<Void>> queryFutures = new ArrayList<>();
+
+        for (int i = 0; i < 100; i++) {
+            CompletableFuture<Void> future = 
executeQueryAsync(Integer.toString(i));
+
+            queryFutures.add(future);
+        }
+
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        for (int i = 0; i < 90; i++) {

Review comment:
       Why do we start 100 queries, but only cancel 90?

##########
File path: solr/core/src/test/org/apache/solr/search/TestTaskManagement.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.search;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.NamedList;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+public class TestTaskManagement extends SolrCloudTestCase {
+    private static final String COLLECTION_NAME = "collection1";
+
+    private ExecutorService executorService;
+
+    @BeforeClass
+    public static void setupCluster() throws Exception {
+        initCore("solrconfig.xml", "schema11.xml");
+
+        configureCluster(4)
+                .addConfig("conf", configset("sql"))
+                .configure();
+    }
+
+    @AfterClass
+    public static void tearDownCluster() throws Exception {
+        shutdownCluster();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setUp();
+
+        CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 1)
+                .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+                .process(cluster.getSolrClient());
+        cluster.waitForActiveCollection(COLLECTION_NAME, 2, 2);
+        cluster.getSolrClient().setDefaultCollection(COLLECTION_NAME);
+
+        cluster.getSolrClient().setDefaultCollection("collection1");

Review comment:
       duplicate

##########
File path: solr/core/src/java/org/apache/solr/search/CancellableCollector.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.search;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.FilterLeafCollector;
+import org.apache.lucene.search.LeafCollector;
+import org.apache.lucene.search.ScoreMode;
+import org.apache.solr.client.solrj.util.Cancellable;
+
+/** Allows a query to be cancelled */
+public class CancellableCollector implements Collector, Cancellable {
+
+  /** Thrown when a query gets cancelled */
+  public static class QueryCancelledException extends RuntimeException {}
+
+  private final Collector collector;
+  private final AtomicBoolean isQueryCancelled;
+
+  public CancellableCollector(Collector collector) {
+    Objects.requireNonNull(collector, "Internal collector not provided but 
wrapper collector accessed");
+
+    this.collector = collector;
+    this.isQueryCancelled = new AtomicBoolean();
+  }
+
+  @Override
+  public LeafCollector getLeafCollector(LeafReaderContext context) throws 
IOException {
+
+    if (isQueryCancelled.compareAndSet(true, true)) {
+      throw new QueryCancelledException();
+    }
+
+    return new FilterLeafCollector(collector.getLeafCollector(context)) {
+
+      @Override
+      public void collect(int doc) throws IOException {
+        if (isQueryCancelled.compareAndSet(true, true)) {

Review comment:
       same as above




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to