[ https://issues.apache.org/jira/browse/SOLR-15036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17249387#comment-17249387 ]
Timothy Potter commented on SOLR-15036: --------------------------------------- [~jbernste] Does drill only support counting per dimension? For my test case, I'm expecting these tuples: {code} {sum(a_d)=5.894460990302005, a_i=0, count(*)=6, max(a_d)=2.2515625018914305, avg(a_d)=0.9824101650503341, min(a_d)=-0.5859583807765252} {sum(a_d)=12.517492417715335, a_i=1, count(*)=6, max(a_d)=3.338305310115201, avg(a_d)=2.086248736285889, min(a_d)=0.03050220236482515} {sum(a_d)=24.076139429000165, a_i=2, count(*)=6, max(a_d)=4.832815828279073, avg(a_d)=4.01268990483336, min(a_d)=3.16905458918893} {sum(a_d)=22.583039803775907, a_i=3, count(*)=6, max(a_d)=5.66831997419713, avg(a_d)=3.7638399672959846, min(a_d)=2.902262184046103} {sum(a_d)=28.243748570490624, a_i=4, count(*)=6, max(a_d)=6.531585917691583, avg(a_d)=4.707291428415104, min(a_d)=2.6395698661907963} {sum(a_d)=37.88196903407075, a_i=5, count(*)=6, max(a_d)=7.555382540979672, avg(a_d)=6.313661505678458, min(a_d)=4.808772939476107} {sum(a_d)=39.25679972070782, a_i=6, count(*)=6, max(a_d)=8.416136012729918, avg(a_d)=6.542799953451302, min(a_d)=5.422492404700898} {sum(a_d)=46.7622185952807, a_i=7, count(*)=6, max(a_d)=8.667999236934058, avg(a_d)=7.793703099213451, min(a_d)=6.934577412906803} {sum(a_d)=53.296172957938325, a_i=8, count(*)=6, max(a_d)=9.566181963643201, avg(a_d)=8.88269549298972, min(a_d)=7.4397380388592556} {sum(a_d)=63.46244550204135, a_i=9, count(*)=6, max(a_d)=12.251349466753346, avg(a_d)=10.577074250340225, min(a_d)=9.232427215193514} {code} Which is what is produced by the facet expression in my description for this JIRA. But {{drill}} doesn't seem like it's aggregating by the {{a_i}} dimension as expected. The following drill expression (w/o rollup for now) produces the following tuples for the same setup ^ {code} drill( SOME_ALIAS_WITH_MANY_COLLS, q="*:*", fl="a_i", sort="a_i asc", rollup( input(), over="a_i", sum(a_d), avg(a_d), min(a_d), max(a_d), count(*) ) ) {code} Produces Tuple stream: {code} {sum(a_d)=0.0, count(*)=1, a_i=0, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=3, a_i=0, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=0, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=0, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=2, a_i=1, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=1, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=2, a_i=1, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=1, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=2, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=2, a_i=2, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=2, a_i=2, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=2, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=3, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=3, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=3, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=2, a_i=3, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=3, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=4, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=4, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=4, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=4, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=4, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=4, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=2, a_i=5, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=2, a_i=5, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=2, a_i=5, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=6, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=2, a_i=6, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=6, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=6, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=6, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=7, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=7, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=7, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=7, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=2, a_i=7, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=8, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=8, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=2, a_i=8, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=8, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=8, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=9, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=9, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=9, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=9, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=9, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {sum(a_d)=0.0, count(*)=1, a_i=9, avg(a_d)=0.0, max(a_d)=-1.7976931348623157E308, min(a_d)=1.7976931348623157E308} {code} As you can see, the metrics around {{a_d}} are all the same for differing {{a_i}} (they don't look right either way). > Use plist automatically for executing a facet expression against a collection > alias backed by multiple collections > ------------------------------------------------------------------------------------------------------------------ > > Key: SOLR-15036 > URL: https://issues.apache.org/jira/browse/SOLR-15036 > Project: Solr > Issue Type: Improvement > Security Level: Public(Default Security Level. Issues are Public) > Components: streaming expressions > Reporter: Timothy Potter > Assignee: Timothy Potter > Priority: Major > Attachments: relay-approach.patch > > Time Spent: 10m > Remaining Estimate: 0h > > For analytics use cases, streaming expressions make it possible to compute > basic aggregations (count, min, max, sum, and avg) over massive data sets. > Moreover, with massive data sets, it is common to use collection aliases over > many underlying collections, for instance time-partitioned aliases backed by > a set of collections, each covering a specific time range. In some cases, we > can end up with many collections (think 50-60) each with 100's of shards. > Aliases help insulate client applications from complex collection topologies > on the server side. > Let's take a basic facet expression that computes some useful aggregation > metrics: > {code:java} > facet( > some_alias, > q="*:*", > fl="a_i", > sort="a_i asc", > buckets="a_i", > bucketSorts="count(*) asc", > bucketSizeLimit=10000, > sum(a_d), avg(a_d), min(a_d), max(a_d), count(*) > ) > {code} > Behind the scenes, the {{FacetStream}} sends a JSON facet request to Solr > which then expands the alias to a list of collections. For each collection, > the top-level distributed query controller gathers a candidate set of > replicas to query and then scatters {{distrib=false}} queries to each replica > in the list. For instance, if we have 60 collections with 200 shards each, > then this results in 12,000 shard requests from the query controller node to > the other nodes in the cluster. The requests are sent in an async manner (see > {{SearchHandler}} and {{HttpShardHandler}}) In my testing, we’ve seen cases > where we hit 18,000 replicas and these queries don’t always come back in a > timely manner. Put simply, this also puts a lot of load on the top-level > query controller node in terms of open connections and new object creation. > Instead, we can use {{plist}} to send the JSON facet query to each collection > in the alias in parallel, which reduces the overhead of each top-level > distributed query from 12,000 to 200 in my example above. With this approach, > you’ll then need to sort the tuples back from each collection and do a > rollup, something like: > {code:java} > select( > rollup( > sort( > plist( > select(facet(coll1,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", > bucketSorts="count(*) asc", bucketSizeLimit=10000, sum(a_d), avg(a_d), > min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, > min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt), > select(facet(coll2,q="*:*", fl="a_i", sort="a_i asc", buckets="a_i", > bucketSorts="count(*) asc", bucketSizeLimit=10000, sum(a_d), avg(a_d), > min(a_d), max(a_d), count(*)),a_i,sum(a_d) as the_sum, avg(a_d) as the_avg, > min(a_d) as the_min, max(a_d) as the_max, count(*) as cnt) > ), > by="a_i asc" > ), > over="a_i", > sum(the_sum), avg(the_avg), min(the_min), max(the_max), sum(cnt) > ), > a_i, sum(the_sum) as the_sum, avg(the_avg) as the_avg, min(the_min) as > the_min, max(the_max) as the_max, sum(cnt) as cnt > ) > {code} > One thing to point out is that you can’t just avg. the averages back from > each collection in the rollup. It needs to be a *weighted avg.* when rolling > up the avg. from each facet expression in the plist. However, we have the > count per collection, so this is doable but will require some changes to the > rollup expression to support weighted average. > While this plist approach is doable, it’s a pain for users to have to create > the rollup / sort over plist expression for collection aliases. After all, > aliases are supposed to hide these types of complexities from client > applications! > The point of this ticket is to investigate the feasibility of auto-wrapping > the facet expression with a rollup / sort / plist when the collection > argument is an alias with multiple collections; other stream sources will be > considered after facet is proven out. > Lastly, I also considered an alternative approach of doing a parallel relay > on the server side. The idea is similar to {{plist}} but instead of this > being driven on the client side, the {{FacetModule}} can create intermediate > queries (I called them {{relay}} queries in my impl.) that help distribute > the load. In my example above, there would be 60 such relay queries, each > sent to a replica for each collection in the alias, which then sends the > {{distrib=false}} queries to each replica. The relay query response handler > collects the facet responses from each replica before sending back to the > top-level query controller for final results. > I have this approach working in the attached patch ([^relay-approach.patch]) > but it feels a little invasive to the {{FacetModule}} (and the distributed > query inner workings in general). To me, the auto- {{plist}} approach feels > like a better layer to add this functionality vs. deeper down in the facet > module code. Moreover, we may be able to leverage the {{plist}} solution for > other stream sources whereas the relay approach required me to change logic > in the {{FacetModule}} directly, so is likely not as reusable for other types > of queries. It's possible the relay approach could be generalized but I'm not > clear how useful that would be beyond streaming expression analytics use > cases; feedback on this point welcome of course. > I also think {{plist}} should try to be clever and avoid sending the > top-level (per collection) request to the same node if it can help it. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org