[ https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17249765#comment-17249765 ]
Timothy Potter commented on SOLR-15036: --------------------------------------- Confirmed, it works nicely now! Thanks for your help Joel {code} {count(*)=6, a_i=0, max(max(a_d))=2.2515625018914305, min(min(a_d))=-0.5859583807765252, sum(sum(a_d))=5.894460990302006, wsum(avg(a_d), count(*))=0.9824101650503342} {count(*)=4, a_i=1, max(max(a_d))=3.338305310115201, min(min(a_d))=0.03050220236482515, sum(sum(a_d))=12.517492417715335, wsum(avg(a_d), count(*))=2.086248736285889} {count(*)=4, a_i=2, max(max(a_d))=4.832815828279073, min(min(a_d))=3.16905458918893, sum(sum(a_d))=24.076139429000165, wsum(avg(a_d), count(*))=4.012689904833361} {count(*)=4, a_i=3, max(max(a_d))=5.66831997419713, min(min(a_d))=2.902262184046103, sum(sum(a_d))=22.58303980377591, wsum(avg(a_d), count(*))=3.763839967295984} {count(*)=4, a_i=4, max(max(a_d))=6.531585917691583, min(min(a_d))=2.6395698661907963, sum(sum(a_d))=28.243748570490624, wsum(avg(a_d), count(*))=4.707291428415103} {count(*)=5, a_i=5, max(max(a_d))=7.555382540979672, min(min(a_d))=4.808772939476107, sum(sum(a_d))=37.88196903407075, wsum(avg(a_d), count(*))=6.313661505678459} {count(*)=5, a_i=6, max(max(a_d))=8.416136012729918, min(min(a_d))=5.422492404700898, sum(sum(a_d))=39.25679972070782, wsum(avg(a_d), count(*))=6.542799953451303} {count(*)=5, a_i=7, max(max(a_d))=8.667999236934058, min(min(a_d))=6.934577412906803, sum(sum(a_d))=46.7622185952807, wsum(avg(a_d), count(*))=7.793703099213451} {count(*)=5, a_i=8, max(max(a_d))=9.566181963643201, min(min(a_d))=7.4397380388592556, sum(sum(a_d))=53.296172957938325, wsum(avg(a_d), count(*))=8.88269549298972} {count(*)=4, a_i=9, max(max(a_d))=12.251349466753346, min(min(a_d))=9.232427215193514, sum(sum(a_d))=63.46244550204135, wsum(avg(a_d), count(*))=10.577074250340223} {code} > Use plist automatically for executing a facet expression against a collection > alias backed by multiple collections > ------------------------------------------------------------------------------------------------------------------ > > Key: SOLR-15036 > URL: https://issues.apache.org/jira/browse/SOLR-15036 > Project: Solr > Issue Type: Improvement > Security Level: Public(Default Security Level. Issues are Public) > Components: streaming expressions > Reporter: Timothy Potter > Assignee: Timothy Potter > Priority: Major > Attachments: relay-approach.patch > > Time Spent: 0.5h > Remaining Estimate: 0h > > For analytics use cases, streaming expressions make it possible to compute > basic aggregations (count, min, max, sum, and avg) over massive data sets. > Moreover, with massive data sets, it is common to use collection aliases over > many underlying collections, for instance time-partitioned aliases backed by > a set of collections, each covering a specific time range. In some cases, we > can end up with many collections (think 50-60) each with 100's of shards. > Aliases help insulate client applications from complex collection topologies > on the server side. > Let's take a basic facet expression that computes some useful aggregation > metrics: > {code:java} > facet( > some_alias, > q="*:*", > fl="a_i", > sort="a_i asc", > buckets="a_i", > bucketSorts="count(*) asc", > bucketSizeLimit=10000, > sum(a_d), avg(a_d), min(a_d), max(a_d), count(*) > ) > {code} > Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr > which then expands the alias to a list of collections. For each collection, > the top-level distributed query controller gathers a candidate set of > replicas to query and then scatters {{distrib=false}} queries to each replica > in the list. For instance, if we have 60 collections with 200 shards each, > then this results in 12,000 shard requests from the query controller node to > the other nodes in the cluster. The requests are sent in an async manner (see > {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases > where we hit 18,000 replicas and these queries don’t always come back in a > timely manner. Put simply, this also puts a lot of load on the top-level > query controller node in terms of open connections and new object creation. > Instead, we can use {{plist}} to send the JSON facet query to each collection > in the alias in parallel, which reduces the overhead of each top-level > distributed query from 12,000 to 200 in my example above. With this approach, > you’ll then need to sort the tuples back from each collection and do a > rollup, something like: > {code:java} > select( > rollup( > sort( > plist( > select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", > bucketSorts="count(*) asc", bucketSizeLimit=10000, sum(a_d), avg(a_d), > min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, > min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt), > select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", > bucketSorts="count(*) asc", bucketSizeLimit=10000, sum(a_d), avg(a_d), > min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, > min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt) > ), > by="a_i asc" > ), > over="a_i", > sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt) > ), > a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as > the_min, max(the_max) as the_max, sum(cnt) as cnt > ) > {code} > One thing to point out is that you can’t just avg. the averages back from > each collection in the rollup. It needs to be a *weighted avg.* when rolling > up the avg. from each facet expression in the plist. However, we have the > count per collection, so this is doable but will require some changes to the > rollup expression to support weighted average. > While this plist approach is doable, it’s a pain for users to have to create > the rollup / sort over plist expression for collection aliases. After all, > aliases are supposed to hide these types of complexities from client > applications! > The point of this ticket is to investigate the feasibility of auto-wrapping > the facet expression with a rollup / sort / plist when the collection > argument is an alias with multiple collections; other stream sources will be > considered after facet is proven out. > Lastly, I also considered an alternative approach of doing a parallel relay > on the server side. The idea is similar to {{plist}} but instead of this > being driven on the client side, the {{FacetModule}} can create intermediate > queries (I called them {{relay}} queries in my impl.) that help distribute > the load. In my example above, there would be 60 such relay queries, each > sent to a replica for each collection in the alias, which then sends the > {{distrib=false}} queries to each replica. The relay query response handler > collects the facet responses from each replica before sending back to the > top-level query controller for final results. > I have this approach working in the attached patch ([^relay-approach.patch]) > but it feels a little invasive to the {{FacetModule}} (and the distributed > query inner workings in general). To me, the auto- {{plist}} approach feels > like a better layer to add this functionality vs. deeper down in the facet > module code. Moreover, we may be able to leverage the {{plist}} solution for > other stream sources whereas the relay approach required me to change logic > in the {{FacetModule}} directly, so is likely not as reusable for other types > of queries. It's possible the relay approach could be generalized but I'm not > clear how useful that would be beyond streaming expression analytics use > cases; feedback on this point welcome of course. > I also think {{plist}} should try to be clever and avoid sending the > top-level (per collection) request to the same node if it can help it. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org