[ https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17261539#comment-17261539 ]
Joel Bernstein edited comment on SOLR-15036 at 1/8/21, 7:25 PM: ---------------------------------------------------------------- One thing we could do to remove any risks and move faster with this would be to make the default behavior be the standard non-plist approach. Then we could turn it on with a named param or a system param. {code} facet(alias, tiered="true") {code} The tiered parameter would turn on the tiered aggregations. was (Author: joel.bernstein): One thing we could do to remove any risks and move faster with this would be to make the default behavior be the standard non-plist approach. Then we could turn it on with a named param or a system param. > Use plist automatically for executing a facet expression against a collection > alias backed by multiple collections > ------------------------------------------------------------------------------------------------------------------ > > Key: SOLR-15036 > URL: https://issues.apache.org/jira/browse/SOLR-15036 > Project: Solr > Issue Type: Improvement > Security Level: Public(Default Security Level. Issues are Public) > Components: streaming expressions > Reporter: Timothy Potter > Assignee: Timothy Potter > Priority: Major > Attachments: relay-approach.patch > > Time Spent: 1h 20m > Remaining Estimate: 0h > > For analytics use cases, streaming expressions make it possible to compute > basic aggregations (count, min, max, sum, and avg) over massive data sets. > Moreover, with massive data sets, it is common to use collection aliases over > many underlying collections, for instance time-partitioned aliases backed by > a set of collections, each covering a specific time range. In some cases, we > can end up with many collections (think 50-60) each with 100's of shards. > Aliases help insulate client applications from complex collection topologies > on the server side. > Let's take a basic facet expression that computes some useful aggregation > metrics: > {code:java} > facet( > some_alias, > q="*:*", > fl="a_i", > sort="a_i asc", > buckets="a_i", > bucketSorts="count(*) asc", > bucketSizeLimit=10000, > sum(a_d), avg(a_d), min(a_d), max(a_d), count(*) > ) > {code} > Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr > which then expands the alias to a list of collections. For each collection, > the top-level distributed query controller gathers a candidate set of > replicas to query and then scatters {{distrib=false}} queries to each replica > in the list. For instance, if we have 60 collections with 200 shards each, > then this results in 12,000 shard requests from the query controller node to > the other nodes in the cluster. The requests are sent in an async manner (see > {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases > where we hit 18,000 replicas and these queries don’t always come back in a > timely manner. Put simply, this also puts a lot of load on the top-level > query controller node in terms of open connections and new object creation. > Instead, we can use {{plist}} to send the JSON facet query to each collection > in the alias in parallel, which reduces the overhead of each top-level > distributed query from 12,000 to 200 in my example above. With this approach, > you’ll then need to sort the tuples back from each collection and do a > rollup, something like: > {code:java} > select( > rollup( > sort( > plist( > select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", > bucketSorts="count(*) asc", bucketSizeLimit=10000, sum(a_d), avg(a_d), > min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, > min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt), > select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", > bucketSorts="count(*) asc", bucketSizeLimit=10000, sum(a_d), avg(a_d), > min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, > min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt) > ), > by="a_i asc" > ), > over="a_i", > sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt) > ), > a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as > the_min, max(the_max) as the_max, sum(cnt) as cnt > ) > {code} > One thing to point out is that you can’t just avg. the averages back from > each collection in the rollup. It needs to be a *weighted avg.* when rolling > up the avg. from each facet expression in the plist. However, we have the > count per collection, so this is doable but will require some changes to the > rollup expression to support weighted average. > While this plist approach is doable, it’s a pain for users to have to create > the rollup / sort over plist expression for collection aliases. After all, > aliases are supposed to hide these types of complexities from client > applications! > The point of this ticket is to investigate the feasibility of auto-wrapping > the facet expression with a rollup / sort / plist when the collection > argument is an alias with multiple collections; other stream sources will be > considered after facet is proven out. > Lastly, I also considered an alternative approach of doing a parallel relay > on the server side. The idea is similar to {{plist}} but instead of this > being driven on the client side, the {{FacetModule}} can create intermediate > queries (I called them {{relay}} queries in my impl.) that help distribute > the load. In my example above, there would be 60 such relay queries, each > sent to a replica for each collection in the alias, which then sends the > {{distrib=false}} queries to each replica. The relay query response handler > collects the facet responses from each replica before sending back to the > top-level query controller for final results. > I have this approach working in the attached patch ([^relay-approach.patch]) > but it feels a little invasive to the {{FacetModule}} (and the distributed > query inner workings in general). To me, the auto- {{plist}} approach feels > like a better layer to add this functionality vs. deeper down in the facet > module code. Moreover, we may be able to leverage the {{plist}} solution for > other stream sources whereas the relay approach required me to change logic > in the {{FacetModule}} directly, so is likely not as reusable for other types > of queries. It's possible the relay approach could be generalized but I'm not > clear how useful that would be beyond streaming expression analytics use > cases; feedback on this point welcome of course. > I also think {{plist}} should try to be clever and avoid sending the > top-level (per collection) request to the same node if it can help it. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org