jbampton commented on a change in pull request #2132: URL: https://github.com/apache/lucene-solr/pull/2132#discussion_r543443487
########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java ########## @@ -351,25 +361,26 @@ public String getCollection() { FieldComparator[] comps = new FieldComparator[sorts.length]; for(int i=0; i<sorts.length; i++) { Review comment: ```suggestion for (int i = 0; i < sorts.length; i++) { ``` ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java ########## @@ -351,25 +361,26 @@ public String getCollection() { FieldComparator[] comps = new FieldComparator[sorts.length]; for(int i=0; i<sorts.length; i++) { - String s = sorts[i]; + comps[i] = parseSortClause(sorts[i]); + } - String fieldName = null; - String order = null; + return comps; + } - if(s.endsWith("asc") || s.endsWith("ASC")) { - order = "asc"; - fieldName = s.substring(0, s.length()-3).trim().replace(" ", ""); - } else if(s.endsWith("desc") || s.endsWith("DESC")) { - order = "desc"; - fieldName = s.substring(0, s.length()-4).trim().replace(" ", ""); - } else { - throw new IOException(String.format(Locale.ROOT,"invalid expression - bad bucketSort '%s'.",bucketSortString)); - } - - comps[i] = new FieldComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING); + private FieldComparator parseSortClause(final String s) throws IOException { + String fieldName = null; + String order = null; + if(s.endsWith("asc") || s.endsWith("ASC")) { + order = "asc"; + fieldName = s.substring(0, s.length()-3).trim().replace(" ", ""); + } else if(s.endsWith("desc") || s.endsWith("DESC")) { Review comment: ```suggestion } else if (s.endsWith("desc") || s.endsWith("DESC")) { ``` ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java ########## @@ -842,10 +874,73 @@ public int getCost() { @Override public StreamComparator getStreamSort() { - if(bucketSorts.length > 1) { - return new MultipleFieldComparator(bucketSorts); + return (bucketSorts.length > 1) ? new MultipleFieldComparator(bucketSorts) : bucketSorts[0]; + } + + @Override + public TupleStream[] parallelize(List<String> partitions) throws IOException { + TupleStream[] parallelStreams = new TupleStream[partitions.size()]; + + // prefer a different node for each collection if possible as we don't want the same remote node + // being the coordinator if possible, otherwise, our plist isn't distributing the load as well + final Set<String> preferredNodes = new HashSet<>(Math.max((int) (parallelStreams.length/.75f) + 1, 16)); + + for (int c=0; c < parallelStreams.length; c++) { Review comment: ```suggestion for (int c = 0; c < parallelStreams.length; c++) { ``` ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/MinMetric.java ########## @@ -87,7 +87,7 @@ public void update(Tuple tuple) { if(l < longMin) { longMin = l; } - } else { + } else if(o instanceof Long) { Review comment: ```suggestion } else if (o instanceof Long) { ``` ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java ########## @@ -351,25 +361,26 @@ public String getCollection() { FieldComparator[] comps = new FieldComparator[sorts.length]; for(int i=0; i<sorts.length; i++) { - String s = sorts[i]; + comps[i] = parseSortClause(sorts[i]); + } - String fieldName = null; - String order = null; + return comps; + } - if(s.endsWith("asc") || s.endsWith("ASC")) { - order = "asc"; - fieldName = s.substring(0, s.length()-3).trim().replace(" ", ""); - } else if(s.endsWith("desc") || s.endsWith("DESC")) { - order = "desc"; - fieldName = s.substring(0, s.length()-4).trim().replace(" ", ""); - } else { - throw new IOException(String.format(Locale.ROOT,"invalid expression - bad bucketSort '%s'.",bucketSortString)); - } - - comps[i] = new FieldComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING); + private FieldComparator parseSortClause(final String s) throws IOException { + String fieldName = null; + String order = null; + if(s.endsWith("asc") || s.endsWith("ASC")) { + order = "asc"; + fieldName = s.substring(0, s.length()-3).trim().replace(" ", ""); + } else if(s.endsWith("desc") || s.endsWith("DESC")) { + order = "desc"; + fieldName = s.substring(0, s.length()-4).trim().replace(" ", ""); + } else { + throw new IOException(String.format(Locale.ROOT,"invalid expression - bad sort caluse '%s'.",s)); Review comment: ```suggestion throw new IOException(String.format(Locale.ROOT, "invalid expression - bad sort caluse '%s'.", s)); ``` ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.io.stream; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.metrics.CountMetric; +import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric; +import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric; +import org.apache.solr.client.solrj.io.stream.metrics.Metric; +import org.apache.solr.client.solrj.io.stream.metrics.MinMetric; +import org.apache.solr.client.solrj.io.stream.metrics.SumMetric; +import org.apache.solr.client.solrj.io.stream.metrics.WeightedSumMetric; + +/** + * Indicates the underlying stream source supports parallelizing metrics computation across collections + * using a rollup of metrics from each collection. + */ +public interface ParallelMetricsRollup { + TupleStream[] parallelize(List<String> partitions) throws IOException; + StreamComparator getParallelListSortOrder() throws IOException; + RollupStream getRollupStream(SortStream sortStream, Metric[] rollupMetrics) throws IOException; + Map<String,String> getRollupSelectFields(Metric[] rollupMetrics); + + default Optional<TupleStream> openParallelStream(StreamContext context, List<String> partitions, Metric[] metrics) throws IOException { + Optional<Metric[]> maybeRollupMetrics = getRollupMetrics(metrics); + if (maybeRollupMetrics.isEmpty()) + return Optional.empty(); // some metric is incompatible with doing a rollup over the plist results + + TupleStream[] parallelStreams = parallelize(partitions); + + // the tuples from each plist need to be sorted using the same order to do a rollup + Metric[] rollupMetrics = maybeRollupMetrics.get(); + StreamComparator comparator = getParallelListSortOrder(); + SortStream sortStream = new SortStream(new ParallelListStream(parallelStreams), comparator); + RollupStream rollup = getRollupStream(sortStream, rollupMetrics); + SelectStream select = new SelectStream(rollup, getRollupSelectFields(rollupMetrics)); + select.setStreamContext(context); + select.open(); + + return Optional.of(select); + } + + default Optional<Metric[]> getRollupMetrics(Metric[] metrics) { + Metric[] rollup = new Metric[metrics.length]; + CountMetric count = null; + for (int m=0; m < rollup.length; m++) { Review comment: ```suggestion for (int m = 0; m < rollup.length; m++) { ``` ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java ########## @@ -351,25 +361,26 @@ public String getCollection() { FieldComparator[] comps = new FieldComparator[sorts.length]; for(int i=0; i<sorts.length; i++) { - String s = sorts[i]; + comps[i] = parseSortClause(sorts[i]); + } - String fieldName = null; - String order = null; + return comps; + } - if(s.endsWith("asc") || s.endsWith("ASC")) { - order = "asc"; - fieldName = s.substring(0, s.length()-3).trim().replace(" ", ""); - } else if(s.endsWith("desc") || s.endsWith("DESC")) { - order = "desc"; - fieldName = s.substring(0, s.length()-4).trim().replace(" ", ""); - } else { - throw new IOException(String.format(Locale.ROOT,"invalid expression - bad bucketSort '%s'.",bucketSortString)); - } - - comps[i] = new FieldComparator(fieldName, order.equalsIgnoreCase("asc") ? ComparatorOrder.ASCENDING : ComparatorOrder.DESCENDING); + private FieldComparator parseSortClause(final String s) throws IOException { + String fieldName = null; + String order = null; + if(s.endsWith("asc") || s.endsWith("ASC")) { Review comment: ```suggestion if (s.endsWith("asc") || s.endsWith("ASC")) { ``` ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.io.stream; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.metrics.CountMetric; +import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric; +import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric; +import org.apache.solr.client.solrj.io.stream.metrics.Metric; +import org.apache.solr.client.solrj.io.stream.metrics.MinMetric; +import org.apache.solr.client.solrj.io.stream.metrics.SumMetric; +import org.apache.solr.client.solrj.io.stream.metrics.WeightedSumMetric; + +/** + * Indicates the underlying stream source supports parallelizing metrics computation across collections + * using a rollup of metrics from each collection. + */ +public interface ParallelMetricsRollup { + TupleStream[] parallelize(List<String> partitions) throws IOException; + StreamComparator getParallelListSortOrder() throws IOException; + RollupStream getRollupStream(SortStream sortStream, Metric[] rollupMetrics) throws IOException; + Map<String,String> getRollupSelectFields(Metric[] rollupMetrics); + + default Optional<TupleStream> openParallelStream(StreamContext context, List<String> partitions, Metric[] metrics) throws IOException { + Optional<Metric[]> maybeRollupMetrics = getRollupMetrics(metrics); + if (maybeRollupMetrics.isEmpty()) + return Optional.empty(); // some metric is incompatible with doing a rollup over the plist results + + TupleStream[] parallelStreams = parallelize(partitions); + + // the tuples from each plist need to be sorted using the same order to do a rollup + Metric[] rollupMetrics = maybeRollupMetrics.get(); + StreamComparator comparator = getParallelListSortOrder(); + SortStream sortStream = new SortStream(new ParallelListStream(parallelStreams), comparator); + RollupStream rollup = getRollupStream(sortStream, rollupMetrics); + SelectStream select = new SelectStream(rollup, getRollupSelectFields(rollupMetrics)); + select.setStreamContext(context); + select.open(); + + return Optional.of(select); + } + + default Optional<Metric[]> getRollupMetrics(Metric[] metrics) { + Metric[] rollup = new Metric[metrics.length]; + CountMetric count = null; + for (int m=0; m < rollup.length; m++) { + Metric nextRollup; + Metric next = metrics[m]; + if (next instanceof SumMetric) { + // sum of sums + nextRollup = new SumMetric(next.getIdentifier()); + } else if (next instanceof MinMetric) { + // min of mins + nextRollup = new MinMetric(next.getIdentifier()); + } else if (next instanceof MaxMetric) { + // max of max + nextRollup = new MaxMetric(next.getIdentifier()); + } else if (next instanceof CountMetric) { + // sum of counts + nextRollup = new SumMetric(next.getIdentifier()); + count = (CountMetric)next; + } else if (next instanceof MeanMetric) { + // WeightedSumMetric must have a count to compute the weighted avg. rollup from ... + // if the user is not requesting count, then we can't parallelize + if (count == null) { + // just look past the current position + for (int n=m+1; n < metrics.length; n++) { Review comment: ```suggestion for (int n = m+1; n < metrics.length; n++) { ``` ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/WeightedSumMetric.java ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.client.solrj.io.stream.metrics; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; + +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + +public class WeightedSumMetric extends Metric { + + public static final String FUNC = "wsum"; + + private static final class Part { + private final double value; + private final long count; + + Part(long count, double value) { + this.count = count; + this.value = value; + } + + double weighted(final long total) { + return ((double) count / total) * value; + } + } + + private String valueCol; + private String countCol; + private List<Part> parts; + + public WeightedSumMetric(String valueCol, String countCol) { + init(valueCol, countCol, false); + } + + public WeightedSumMetric(String valueCol, String countCol, boolean outputLong) { + init(valueCol, countCol, outputLong); + } + + public WeightedSumMetric(StreamExpression expression, StreamFactory factory) throws IOException { + // grab all parameters out + String functionName = expression.getFunctionName(); + if (!FUNC.equals(functionName)) { + throw new IOException("Expected '" + FUNC + "' function but found " + functionName); + } + String valueCol = factory.getValueOperand(expression, 0); + String countCol = factory.getValueOperand(expression, 1); + String outputLong = factory.getValueOperand(expression, 2); + + // validate expression contains only what we want. + if (null == valueCol) { + throw new IOException(String.format(Locale.ROOT, "Invalid expression %s - expected %s(valueCol,countCol)", expression, FUNC)); + } + + boolean ol = false; + if (outputLong != null) { + ol = Boolean.parseBoolean(outputLong); + } + + init(valueCol, countCol, ol); + } + + private void init(String valueCol, String countCol, boolean outputLong) { + this.valueCol = valueCol; + this.countCol = countCol != null ? countCol : "count(*)"; + this.outputLong = outputLong; + setFunctionName(FUNC); + setIdentifier(FUNC, "(", valueCol, ", " + countCol + ")"); + } + + public void update(Tuple tuple) { + Object c = tuple.get(countCol); + Object o = tuple.get(valueCol); + if (c instanceof Number && o instanceof Number) { + if (parts == null) { + parts = new LinkedList<>(); + } + Number count = (Number) c; + Number value = (Number) o; + parts.add(new Part(count.longValue(), value.doubleValue())); + } + } + + public Metric newInstance() { + return new WeightedSumMetric(valueCol, countCol, outputLong); + } + + public String[] getColumns() { + return new String[]{valueCol, countCol}; + } + + public Number getValue() { + long total = sumCounts(); + double wavg = 0d; + for (Part next : parts) { + wavg += next.weighted(total); + } + return outputLong ? Math.round(wavg) : wavg; + } + + private long sumCounts() { + long total = 0L; + for (Part next : parts) { + total += next.count; + } + return total; + } + + @Override + public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException { + return new StreamExpression(getFunctionName()).withParameter(valueCol).withParameter(countCol).withParameter(Boolean.toString(outputLong)); + } +} Review comment: ```suggestion } ``` ########## File path: solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/ParallelFacetStreamOverAliasTest.java ########## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.client.solrj.io.stream; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.commons.math3.distribution.NormalDistribution; +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.commons.math3.random.RandomGenerator; +import org.apache.commons.math3.util.Precision; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.io.SolrClientCache; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.handler.SolrDefaultStreamFactory; +import org.apache.solr.util.LogLevel; +import org.apache.solr.util.RTimer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * objective of this test suite is to test scalability of Streaming expressions for large deployments, + * for example where there are many collections with high sharding and each collection has millions of documents + */ +@SolrTestCaseJ4.SuppressSSL +@LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40", "Lucene41", "Lucene42", "Lucene45"}) +@LogLevel("org.apache.solr.client.solrj.io.stream=INFO;org.apache.solr.common.cloud.ZkStateReader=WARN;org.apache.solr.metrics=WARN;org.apache.solr.core.SolrCore=WARN;org.apache.solr.cloud=WARN;org.apache.solr.update=WARN;org.apache.solr.rest=ERROR;org.apache.solr.servlet.HttpSolrCall=WARN;org.apache.solr=WARN;org.apache.solr.client.solrj.impl=INFO") +public class ParallelFacetStreamOverAliasTest extends SolrCloudTestCase { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final String ALIAS_NAME = "SOME_ALIAS_WITH_MANY_COLLS"; + + private static final String id = "id"; + private static final int NUM_COLLECTIONS = 2; + private static final int NUM_DOCS_PER_COLLECTION = 40; + private static final int NUM_SHARDS_PER_COLLECTION = 4; + private static final int CARD = 10; + + private static List<String> listOfCollections; + private static final RandomGenerator rand = new JDKRandomGenerator(5150); + + @BeforeClass + public static void setupCluster() throws Exception { + final RTimer timer = new RTimer(); + configureCluster(NUM_COLLECTIONS).withMetrics(false) + .addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf")) + .configure(); + cleanup(); + setupCollectionsAndAlias(); + + if (log.isInfoEnabled()) + log.info("Took {}ms to setup cluster with {} collections", timer.getTime(), NUM_COLLECTIONS); + } + + /** + * setup the testbed with necessary collections, documents, and alias + */ + public static void setupCollectionsAndAlias() throws Exception { + + final NormalDistribution[] dists = new NormalDistribution[CARD]; + for (int i=0; i < dists.length; i++) { Review comment: ```suggestion for (int i = 0; i < dists.length; i++) { ``` ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/metrics/MaxMetric.java ########## @@ -86,7 +86,7 @@ public void update(Tuple tuple) { if(l > longMax) { longMax = l; } - } else { + } else if(o instanceof Long) { Review comment: ```suggestion } else if (o instanceof Long) { ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org