Thanks for all this info, Joel. I found if I artificially limit the triples stream to 3M and use the /export handler with only 2 workers, I can get results in @ 20 seconds and Solr doesn't tip over. That seems to be the best config for this local/single instance.
It's also clear I'm not using streaming expressions optimally so I need to do some more thinking! I don't want to stream all 26M triples (much less billions of docs) just for a simple join in which I expect a couple hundred results. I wanted to see if I could directly port a SQL join into this framework using normalized Solr docs and single streaming expression. I'll do some more tinkering. Thanks again, Ryan On Sun, May 15, 2016 at 4:14 PM, Joel Bernstein <joels...@gmail.com> wrote: > One other thing to keep in is how the partitioning is done when you add the > partitionKeys. > > Partitioning is done using the HashQParserPlugin, which builds a filter for > each worker. Under the covers this is using the normal filter query > mechanism. So after the filters are built and cached they are effectively > free from a performance standpoint. But on the first run they need to be > built and they need to be rebuilt after each commit. These means several > things: > > 1) If you have 8 workers then 8 filters need to be computed. The workers > call down to the shards in parallel so the filters will build in parallel. > But this can take time and the larger the index, the more time it takes. > > 2) Like all filters, the partitioning filters can be pre-computed using > warming queries. You can check the logs and look for the {!hash ...} filter > queries to see the syntax. But basically you would need a warming query for > each worker ID. > > 3) If you don't pre-warm the partitioning filters then there will be a > performance penalty the first time they are computed. The next query will > be much faster. > > 4) This is another area where having more shards helps with performance, > because having fewer documents per shard, means faster times building the > partition filters. > > In the future we'll switch to segment level partitioning filters, so that > following each commit only the new segments need to be built. But this is > still on the TODO list. > > > Joel Bernstein > http://joelsolr.blogspot.com/ > > On Sun, May 15, 2016 at 5:38 PM, Joel Bernstein <joels...@gmail.com> > wrote: > > > Ah, you also used 4 shards. That means with 8 workers there were 32 > > concurrent queries against the /select handler each requesting 100,000 > > rows. That's a really heavy load! > > > > You can still try out the approach from my last email on the 4 shards > > setup, as you add workers gradually you'll gradually ramp up the > > parallelism on the machine. With a single worker you'll have 4 shards > > working in parallel. With 8 works you'll have 32 threads working > parallel. > > > > Joel Bernstein > > http://joelsolr.blogspot.com/ > > > > On Sun, May 15, 2016 at 5:23 PM, Joel Bernstein <joels...@gmail.com> > > wrote: > > > >> Hi Ryan, > >> > >> The rows=100000 on the /select handler is likely going to cause problems > >> with 8 workers. This is calling the /select handler with 8 concurrent > >> workers each retrieving 100,000 rows. The /select handler bogs down as > the > >> number of rows increases. So using the rows parameter with the /select > >> handler is really not a strategy for limiting the size of the join. To > >> limit the size of the join you would need to place some kind of filter > on > >> the query and still use the /export handler. > >> > >> The /export handler was developed to handle large exports and not get > >> bogged down. > >> > >> You may want to start just getting an understanding of how much data a > >> single node can export, and how long it takes. > >> > >> 1) Try running a single *:* search() using the /export handler on the > >> triple collection. Time how long it takes. If you run into problems > getting > >> this to complete then attach a memory profiler. It may be that 8 gigs is > >> not enough to hold the docValues in memory and process the query. The > >> /export handler does not use more memory as the result set rises, so the > >> /export handler should be able process the entire query (30,000,000 > docs). > >> But it does take a lot of memory to hold the docValues fields in memory. > >> This query will likely take some time to complete though as you are > sorting > >> and exporting 30,000,000 million docs from a single node. > >> > >> 2) Then try running the same *:* search() against the /export handler in > >> parallel() gradually increasing the number of workers. Time how long it > >> takes as you add workers and watch the load it places on the server. > >> Eventually you'll max out your performance. > >> > >> > >> Then you'll start to get an idea of how fast a single node can sort and > >> export data. > >> > >> > >> > >> > >> Joel Bernstein > >> http://joelsolr.blogspot.com/ > >> > >> On Sat, May 14, 2016 at 4:14 PM, Ryan Cutter <ryancut...@gmail.com> > >> wrote: > >> > >>> Hello, I'm running Solr on my laptop with -Xmx8g and gave each > >>> collection 4 > >>> shards and 2 replicas. > >>> > >>> Even grabbing 100k triple documents (like the following) is taking 20 > >>> seconds to complete and prone to fall over. I could try this in a > proper > >>> cluster with multiple hosts and more sharding, etc. I just thought I > was > >>> tinkering with a small enough data set to use locally. > >>> > >>> parallel( > >>> triple, > >>> innerJoin( > >>> search(triple, q=*:*, fl="subject_id,type_id", sort="type_id > asc", > >>> partitionKeys="type_id", rows="100000"), > >>> search(triple_type, q=*:*, fl="triple_type_id", > >>> sort="triple_type_id > >>> asc", partitionKeys="triple_type_id", qt="/export"), > >>> on="type_id=triple_type_id" > >>> ), > >>> sort="subject_id asc", > >>> workers="8") > >>> > >>> > >>> When Solr does crash, it's leaving messages like this. > >>> > >>> ERROR - 2016-05-14 20:00:53.892; [c:triple s:shard3 r:core_node2 > >>> x:triple_shard3_replica2] org.apache.solr.common.SolrException; > >>> null:java.io.IOException: java.util.concurrent.TimeoutException: Idle > >>> timeout expired: 50001/50000 ms > >>> > >>> at > >>> > >>> > org.eclipse.jetty.util.SharedBlockingCallback$Blocker.block(SharedBlockingCallback.java:226) > >>> > >>> at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:164) > >>> > >>> at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:530) > >>> > >>> at > >>> > >>> > org.apache.solr.response.QueryResponseWriterUtil$1.write(QueryResponseWriterUtil.java:54) > >>> > >>> at java.io.OutputStream.write(OutputStream.java:116) > >>> > >>> at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > >>> > >>> at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) > >>> > >>> at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125) > >>> > >>> at java.io.OutputStreamWriter.write(OutputStreamWriter.java:207) > >>> > >>> at org.apache.solr.util.FastWriter.flush(FastWriter.java:140) > >>> > >>> at org.apache.solr.util.FastWriter.write(FastWriter.java:54) > >>> > >>> at > >>> > >>> > org.apache.solr.response.JSONWriter.writeMapCloser(JSONResponseWriter.java:420) > >>> > >>> at > >>> > >>> > org.apache.solr.response.JSONWriter.writeSolrDocument(JSONResponseWriter.java:364) > >>> > >>> at > >>> > >>> > org.apache.solr.response.TextResponseWriter.writeDocuments(TextResponseWriter.java:246) > >>> > >>> at > >>> > >>> > org.apache.solr.response.TextResponseWriter.writeVal(TextResponseWriter.java:150) > >>> > >>> at > >>> > >>> > org.apache.solr.response.JSONWriter.writeNamedListAsMapWithDups(JSONResponseWriter.java:183) > >>> > >>> On Fri, May 13, 2016 at 5:50 PM, Joel Bernstein <joels...@gmail.com> > >>> wrote: > >>> > >>> > Also the hashJoin is going to read the entire entity table into > >>> memory. If > >>> > that's a large index that could be using lots of memory. > >>> > > >>> > 25 million docs should be ok to /export from one node, as long as you > >>> have > >>> > enough memory to load the docValues for the fields for sorting and > >>> > exporting. > >>> > > >>> > Breaking down the query into it's parts will show where the issue is. > >>> Also > >>> > adding more heap might give you enough memory. > >>> > > >>> > In my testing the max docs per second I've seen the /export handler > >>> push > >>> > from a single node is 650,000. In order to get 650,000 docs per > second > >>> on > >>> > one node you have to partition the stream with workers. In my testing > >>> it > >>> > took 8 workers hitting one node to achieve the 650,000 docs per > second. > >>> > > >>> > But the numbers get big as the cluster grows. With 20 shards and 4 > >>> replicas > >>> > and 32 workers, you could export 52,000,000 docs per-second. With 40 > >>> > shards, 5 replicas and 40 workers you could export 130,000,000 docs > per > >>> > second. > >>> > > >>> > So with large clusters you could do very large distributed joins with > >>> > sub-second performance. > >>> > > >>> > > >>> > > >>> > > >>> > Joel Bernstein > >>> > http://joelsolr.blogspot.com/ > >>> > > >>> > On Fri, May 13, 2016 at 8:11 PM, Ryan Cutter <ryancut...@gmail.com> > >>> wrote: > >>> > > >>> > > Thanks very much for the advice. Yes, I'm running in a very basic > >>> single > >>> > > shard environment. I thought that 25M docs was small enough to not > >>> > require > >>> > > anything special but I will try scaling like you suggest and let > you > >>> know > >>> > > what happens. > >>> > > > >>> > > Cheers, Ryan > >>> > > > >>> > > On Fri, May 13, 2016 at 4:53 PM, Joel Bernstein < > joels...@gmail.com> > >>> > > wrote: > >>> > > > >>> > > > I would try breaking down the second query to see when the > problems > >>> > > occur. > >>> > > > > >>> > > > 1) Start with just a single *:* search from one of the > collections. > >>> > > > 2) Then test the innerJoin. The innerJoin won't take much memory > as > >>> > it's > >>> > > a > >>> > > > streaming merge join. > >>> > > > 3) Then try the full thing. > >>> > > > > >>> > > > If you're running a large join like this all on one host then you > >>> might > >>> > > not > >>> > > > have enough memory for the docValues and the two joins. In > general > >>> > > > streaming is designed to scale by adding servers. It scales 3 > ways: > >>> > > > > >>> > > > 1) Adding shards, splits up the index for more pushing power. > >>> > > > 2) Adding workers, partitions the streams and splits up the join > / > >>> > merge > >>> > > > work. > >>> > > > 3) Adding replicas, when you have workers you will add pushing > >>> power by > >>> > > > adding replicas. This is because workers will fetch partitions of > >>> the > >>> > > > streams from across the entire cluster. So ALL replicas will be > >>> pushing > >>> > > at > >>> > > > once. > >>> > > > > >>> > > > So, imagine a setup with 20 shards, 4 replicas, and 20 workers. > >>> You can > >>> > > > perform massive joins quickly. > >>> > > > > >>> > > > But for you're scenario and available hardware you can experiment > >>> with > >>> > > > different cluster sizes. > >>> > > > > >>> > > > > >>> > > > > >>> > > > Joel Bernstein > >>> > > > http://joelsolr.blogspot.com/ > >>> > > > > >>> > > > On Fri, May 13, 2016 at 7:27 PM, Ryan Cutter < > ryancut...@gmail.com > >>> > > >>> > > wrote: > >>> > > > > >>> > > > > qt="/export" immediately fixed the query in Question #1. Sorry > >>> for > >>> > > > missing > >>> > > > > that in the docs! > >>> > > > > > >>> > > > > The second query (with /export) crashes the server so I was > >>> going to > >>> > > look > >>> > > > > at parallelization if you think that's a good idea. It also > >>> seems > >>> > > unwise > >>> > > > > to joining into 26M docs so maybe I can reconfigure the query > to > >>> run > >>> > > > along > >>> > > > > a more happy path :-) The schema is very RDBMS-centric so > maybe > >>> that > >>> > > > just > >>> > > > > won't ever work in this framework. > >>> > > > > > >>> > > > > Here's the log but it's not very helpful. > >>> > > > > > >>> > > > > > >>> > > > > INFO - 2016-05-13 23:18:13.214; [c:triple s:shard1 > r:core_node1 > >>> > > > > x:triple_shard1_replica1] org.apache.solr.core.SolrCore; > >>> > > > > [triple_shard1_replica1] webapp=/solr path=/export > >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > params={q=*:*&distrib=false&fl=triple_id,subject_id,type_id&sort=type_id+asc&wt=json&version=2.2} > >>> > > > > hits=26305619 status=0 QTime=61 > >>> > > > > > >>> > > > > INFO - 2016-05-13 23:18:13.747; [c:triple_type s:shard1 > >>> r:core_node1 > >>> > > > > x:triple_type_shard1_replica1] org.apache.solr.core.SolrCore; > >>> > > > > [triple_type_shard1_replica1] webapp=/solr path=/export > >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > params={q=*:*&distrib=false&fl=triple_type_id,triple_type_label&sort=triple_type_id+asc&wt=json&version=2.2} > >>> > > > > hits=702 status=0 QTime=2 > >>> > > > > > >>> > > > > INFO - 2016-05-13 23:18:48.504; [ ] > >>> > > > > org.apache.solr.common.cloud.ConnectionManager; Watcher > >>> > > > > org.apache.solr.common.cloud.ConnectionManager@6ad0f304 > >>> > > > > name:ZooKeeperConnection Watcher:localhost:9983 got event > >>> > WatchedEvent > >>> > > > > state:Disconnected type:None path:null path:null type:None > >>> > > > > > >>> > > > > INFO - 2016-05-13 23:18:48.504; [ ] > >>> > > > > org.apache.solr.common.cloud.ConnectionManager; zkClient has > >>> > > disconnected > >>> > > > > > >>> > > > > ERROR - 2016-05-13 23:18:51.316; [c:triple s:shard1 > r:core_node1 > >>> > > > > x:triple_shard1_replica1] org.apache.solr.common.SolrException; > >>> > > > null:Early > >>> > > > > Client Disconnect > >>> > > > > > >>> > > > > WARN - 2016-05-13 23:18:51.431; [ ] > >>> > > > > org.apache.zookeeper.ClientCnxn$SendThread; Session > >>> 0x154ac66c81e0002 > >>> > > for > >>> > > > > server localhost/0:0:0:0:0:0:0:1:9983, unexpected error, > closing > >>> > socket > >>> > > > > connection and attempting reconnect > >>> > > > > > >>> > > > > java.io.IOException: Connection reset by peer > >>> > > > > > >>> > > > > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > >>> > > > > > >>> > > > > at > >>> sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > >>> > > > > > >>> > > > > at > >>> sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > >>> > > > > > >>> > > > > at sun.nio.ch.IOUtil.read(IOUtil.java:192) > >>> > > > > > >>> > > > > at > >>> > > sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) > >>> > > > > > >>> > > > > at > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68) > >>> > > > > > >>> > > > > at > >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366) > >>> > > > > > >>> > > > > at > >>> > > > > > >>> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) > >>> > > > > > >>> > > > > On Fri, May 13, 2016 at 3:09 PM, Joel Bernstein < > >>> joels...@gmail.com> > >>> > > > > wrote: > >>> > > > > > >>> > > > > > A couple of other things: > >>> > > > > > > >>> > > > > > 1) Your innerJoin can parallelized across workers to improve > >>> > > > performance. > >>> > > > > > Take a look at the docs on the parallel function for the > >>> details. > >>> > > > > > > >>> > > > > > 2) It looks like you might be doing graph operations with > >>> joins. > >>> > You > >>> > > > > might > >>> > > > > > to take a look at the gatherNodes function coming in 6.1: > >>> > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=62693238 > >>> > > > > > > >>> > > > > > Joel Bernstein > >>> > > > > > http://joelsolr.blogspot.com/ > >>> > > > > > > >>> > > > > > On Fri, May 13, 2016 at 5:57 PM, Joel Bernstein < > >>> > joels...@gmail.com> > >>> > > > > > wrote: > >>> > > > > > > >>> > > > > > > When doing things that require all the results (like joins) > >>> you > >>> > > need > >>> > > > to > >>> > > > > > > specify the /export handler in the search function. > >>> > > > > > > > >>> > > > > > > qt="/export" > >>> > > > > > > > >>> > > > > > > The search function defaults to the /select handler which > is > >>> > > designed > >>> > > > > to > >>> > > > > > > return the top N results. The /export handler always > returns > >>> all > >>> > > > > results > >>> > > > > > > that match the query. Also keep in mind that the /export > >>> handler > >>> > > > > requires > >>> > > > > > > that sort fields and fl fields have docValues set. > >>> > > > > > > > >>> > > > > > > Joel Bernstein > >>> > > > > > > http://joelsolr.blogspot.com/ > >>> > > > > > > > >>> > > > > > > On Fri, May 13, 2016 at 5:36 PM, Ryan Cutter < > >>> > ryancut...@gmail.com > >>> > > > > >>> > > > > > wrote: > >>> > > > > > > > >>> > > > > > >> Question #1: > >>> > > > > > >> > >>> > > > > > >> triple_type collection has a few hundred docs and triple > >>> has 25M > >>> > > > docs. > >>> > > > > > >> > >>> > > > > > >> When I search for a particular subject_id in triple which > I > >>> know > >>> > > has > >>> > > > > 14 > >>> > > > > > >> results and do not pass in 'rows' params, it returns 0 > >>> results: > >>> > > > > > >> > >>> > > > > > >> innerJoin( > >>> > > > > > >> search(triple, q=subject_id:1656521, > >>> > > > > > >> fl="triple_id,subject_id,type_id", > >>> > > > > > >> sort="type_id asc"), > >>> > > > > > >> search(triple_type, q=*:*, > >>> > > > fl="triple_type_id,triple_type_label", > >>> > > > > > >> sort="triple_type_id asc"), > >>> > > > > > >> on="type_id=triple_type_id" > >>> > > > > > >> ) > >>> > > > > > >> > >>> > > > > > >> When I do the same search with rows=10000, it returns 14 > >>> > results: > >>> > > > > > >> > >>> > > > > > >> innerJoin( > >>> > > > > > >> search(triple, q=subject_id:1656521, > >>> > > > > > >> fl="triple_id,subject_id,type_id", > >>> > > > > > >> sort="type_id asc", rows=10000), > >>> > > > > > >> search(triple_type, q=*:*, > >>> > > > fl="triple_type_id,triple_type_label", > >>> > > > > > >> sort="triple_type_id asc", rows=10000), > >>> > > > > > >> on="type_id=triple_type_id" > >>> > > > > > >> ) > >>> > > > > > >> > >>> > > > > > >> Am I doing this right? Is there a magic number to pass > into > >>> > rows > >>> > > > > which > >>> > > > > > >> says "give me all the results which match this query"? > >>> > > > > > >> > >>> > > > > > >> > >>> > > > > > >> Question #2: > >>> > > > > > >> > >>> > > > > > >> Perhaps related to the first question but I want to run > the > >>> > > > > innerJoin() > >>> > > > > > >> without the subject_id - rather have it use the results of > >>> > another > >>> > > > > > query. > >>> > > > > > >> But this does not return any results. I'm saying "search > >>> for > >>> > this > >>> > > > > > entity > >>> > > > > > >> based on id then use that result's entity_id as the > >>> subject_id > >>> > to > >>> > > > look > >>> > > > > > >> through the triple/triple_type collections: > >>> > > > > > >> > >>> > > > > > >> hashJoin( > >>> > > > > > >> innerJoin( > >>> > > > > > >> search(triple, q=*:*, > >>> fl="triple_id,subject_id,type_id", > >>> > > > > > >> sort="type_id asc"), > >>> > > > > > >> search(triple_type, q=*:*, > >>> > > > > > fl="triple_type_id,triple_type_label", > >>> > > > > > >> sort="triple_type_id asc"), > >>> > > > > > >> on="type_id=triple_type_id" > >>> > > > > > >> ), > >>> > > > > > >> hashed=search(entity, > >>> > > > > > >> > >>> q=id:"urn:sid:entity:455dfa1aa27eedad21ac2115797c1580bb3b3b4e", > >>> > > > > > >> fl="entity_id,entity_label", sort="entity_id asc"), > >>> > > > > > >> on="subject_id=entity_id" > >>> > > > > > >> ) > >>> > > > > > >> > >>> > > > > > >> Am I using doing this hashJoin right? > >>> > > > > > >> > >>> > > > > > >> Thanks very much, Ryan > >>> > > > > > >> > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >> > >> > > >