Ah, you also used 4 shards. That means with 8 workers there were 32 concurrent queries against the /select handler each requesting 100,000 rows. That's a really heavy load!
You can still try out the approach from my last email on the 4 shards setup, as you add workers gradually you'll gradually ramp up the parallelism on the machine. With a single worker you'll have 4 shards working in parallel. With 8 works you'll have 32 threads working parallel. Joel Bernstein http://joelsolr.blogspot.com/ On Sun, May 15, 2016 at 5:23 PM, Joel Bernstein <joels...@gmail.com> wrote: > Hi Ryan, > > The rows=100000 on the /select handler is likely going to cause problems > with 8 workers. This is calling the /select handler with 8 concurrent > workers each retrieving 100,000 rows. The /select handler bogs down as the > number of rows increases. So using the rows parameter with the /select > handler is really not a strategy for limiting the size of the join. To > limit the size of the join you would need to place some kind of filter on > the query and still use the /export handler. > > The /export handler was developed to handle large exports and not get > bogged down. > > You may want to start just getting an understanding of how much data a > single node can export, and how long it takes. > > 1) Try running a single *:* search() using the /export handler on the > triple collection. Time how long it takes. If you run into problems getting > this to complete then attach a memory profiler. It may be that 8 gigs is > not enough to hold the docValues in memory and process the query. The > /export handler does not use more memory as the result set rises, so the > /export handler should be able process the entire query (30,000,000 docs). > But it does take a lot of memory to hold the docValues fields in memory. > This query will likely take some time to complete though as you are sorting > and exporting 30,000,000 million docs from a single node. > > 2) Then try running the same *:* search() against the /export handler in > parallel() gradually increasing the number of workers. Time how long it > takes as you add workers and watch the load it places on the server. > Eventually you'll max out your performance. > > > Then you'll start to get an idea of how fast a single node can sort and > export data. > > > > > Joel Bernstein > http://joelsolr.blogspot.com/ > > On Sat, May 14, 2016 at 4:14 PM, Ryan Cutter <ryancut...@gmail.com> wrote: > >> Hello, I'm running Solr on my laptop with -Xmx8g and gave each collection >> 4 >> shards and 2 replicas. >> >> Even grabbing 100k triple documents (like the following) is taking 20 >> seconds to complete and prone to fall over. I could try this in a proper >> cluster with multiple hosts and more sharding, etc. I just thought I was >> tinkering with a small enough data set to use locally. >> >> parallel( >> triple, >> innerJoin( >> search(triple, q=*:*, fl="subject_id,type_id", sort="type_id asc", >> partitionKeys="type_id", rows="100000"), >> search(triple_type, q=*:*, fl="triple_type_id", sort="triple_type_id >> asc", partitionKeys="triple_type_id", qt="/export"), >> on="type_id=triple_type_id" >> ), >> sort="subject_id asc", >> workers="8") >> >> >> When Solr does crash, it's leaving messages like this. >> >> ERROR - 2016-05-14 20:00:53.892; [c:triple s:shard3 r:core_node2 >> x:triple_shard3_replica2] org.apache.solr.common.SolrException; >> null:java.io.IOException: java.util.concurrent.TimeoutException: Idle >> timeout expired: 50001/50000 ms >> >> at >> >> org.eclipse.jetty.util.SharedBlockingCallback$Blocker.block(SharedBlockingCallback.java:226) >> >> at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:164) >> >> at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:530) >> >> at >> >> org.apache.solr.response.QueryResponseWriterUtil$1.write(QueryResponseWriterUtil.java:54) >> >> at java.io.OutputStream.write(OutputStream.java:116) >> >> at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) >> >> at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) >> >> at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125) >> >> at java.io.OutputStreamWriter.write(OutputStreamWriter.java:207) >> >> at org.apache.solr.util.FastWriter.flush(FastWriter.java:140) >> >> at org.apache.solr.util.FastWriter.write(FastWriter.java:54) >> >> at >> >> org.apache.solr.response.JSONWriter.writeMapCloser(JSONResponseWriter.java:420) >> >> at >> >> org.apache.solr.response.JSONWriter.writeSolrDocument(JSONResponseWriter.java:364) >> >> at >> >> org.apache.solr.response.TextResponseWriter.writeDocuments(TextResponseWriter.java:246) >> >> at >> >> org.apache.solr.response.TextResponseWriter.writeVal(TextResponseWriter.java:150) >> >> at >> >> org.apache.solr.response.JSONWriter.writeNamedListAsMapWithDups(JSONResponseWriter.java:183) >> >> On Fri, May 13, 2016 at 5:50 PM, Joel Bernstein <joels...@gmail.com> >> wrote: >> >> > Also the hashJoin is going to read the entire entity table into memory. >> If >> > that's a large index that could be using lots of memory. >> > >> > 25 million docs should be ok to /export from one node, as long as you >> have >> > enough memory to load the docValues for the fields for sorting and >> > exporting. >> > >> > Breaking down the query into it's parts will show where the issue is. >> Also >> > adding more heap might give you enough memory. >> > >> > In my testing the max docs per second I've seen the /export handler push >> > from a single node is 650,000. In order to get 650,000 docs per second >> on >> > one node you have to partition the stream with workers. In my testing it >> > took 8 workers hitting one node to achieve the 650,000 docs per second. >> > >> > But the numbers get big as the cluster grows. With 20 shards and 4 >> replicas >> > and 32 workers, you could export 52,000,000 docs per-second. With 40 >> > shards, 5 replicas and 40 workers you could export 130,000,000 docs per >> > second. >> > >> > So with large clusters you could do very large distributed joins with >> > sub-second performance. >> > >> > >> > >> > >> > Joel Bernstein >> > http://joelsolr.blogspot.com/ >> > >> > On Fri, May 13, 2016 at 8:11 PM, Ryan Cutter <ryancut...@gmail.com> >> wrote: >> > >> > > Thanks very much for the advice. Yes, I'm running in a very basic >> single >> > > shard environment. I thought that 25M docs was small enough to not >> > require >> > > anything special but I will try scaling like you suggest and let you >> know >> > > what happens. >> > > >> > > Cheers, Ryan >> > > >> > > On Fri, May 13, 2016 at 4:53 PM, Joel Bernstein <joels...@gmail.com> >> > > wrote: >> > > >> > > > I would try breaking down the second query to see when the problems >> > > occur. >> > > > >> > > > 1) Start with just a single *:* search from one of the collections. >> > > > 2) Then test the innerJoin. The innerJoin won't take much memory as >> > it's >> > > a >> > > > streaming merge join. >> > > > 3) Then try the full thing. >> > > > >> > > > If you're running a large join like this all on one host then you >> might >> > > not >> > > > have enough memory for the docValues and the two joins. In general >> > > > streaming is designed to scale by adding servers. It scales 3 ways: >> > > > >> > > > 1) Adding shards, splits up the index for more pushing power. >> > > > 2) Adding workers, partitions the streams and splits up the join / >> > merge >> > > > work. >> > > > 3) Adding replicas, when you have workers you will add pushing >> power by >> > > > adding replicas. This is because workers will fetch partitions of >> the >> > > > streams from across the entire cluster. So ALL replicas will be >> pushing >> > > at >> > > > once. >> > > > >> > > > So, imagine a setup with 20 shards, 4 replicas, and 20 workers. You >> can >> > > > perform massive joins quickly. >> > > > >> > > > But for you're scenario and available hardware you can experiment >> with >> > > > different cluster sizes. >> > > > >> > > > >> > > > >> > > > Joel Bernstein >> > > > http://joelsolr.blogspot.com/ >> > > > >> > > > On Fri, May 13, 2016 at 7:27 PM, Ryan Cutter <ryancut...@gmail.com> >> > > wrote: >> > > > >> > > > > qt="/export" immediately fixed the query in Question #1. Sorry >> for >> > > > missing >> > > > > that in the docs! >> > > > > >> > > > > The second query (with /export) crashes the server so I was going >> to >> > > look >> > > > > at parallelization if you think that's a good idea. It also seems >> > > unwise >> > > > > to joining into 26M docs so maybe I can reconfigure the query to >> run >> > > > along >> > > > > a more happy path :-) The schema is very RDBMS-centric so maybe >> that >> > > > just >> > > > > won't ever work in this framework. >> > > > > >> > > > > Here's the log but it's not very helpful. >> > > > > >> > > > > >> > > > > INFO - 2016-05-13 23:18:13.214; [c:triple s:shard1 r:core_node1 >> > > > > x:triple_shard1_replica1] org.apache.solr.core.SolrCore; >> > > > > [triple_shard1_replica1] webapp=/solr path=/export >> > > > > >> > > > > >> > > > >> > > >> > >> params={q=*:*&distrib=false&fl=triple_id,subject_id,type_id&sort=type_id+asc&wt=json&version=2.2} >> > > > > hits=26305619 status=0 QTime=61 >> > > > > >> > > > > INFO - 2016-05-13 23:18:13.747; [c:triple_type s:shard1 >> r:core_node1 >> > > > > x:triple_type_shard1_replica1] org.apache.solr.core.SolrCore; >> > > > > [triple_type_shard1_replica1] webapp=/solr path=/export >> > > > > >> > > > > >> > > > >> > > >> > >> params={q=*:*&distrib=false&fl=triple_type_id,triple_type_label&sort=triple_type_id+asc&wt=json&version=2.2} >> > > > > hits=702 status=0 QTime=2 >> > > > > >> > > > > INFO - 2016-05-13 23:18:48.504; [ ] >> > > > > org.apache.solr.common.cloud.ConnectionManager; Watcher >> > > > > org.apache.solr.common.cloud.ConnectionManager@6ad0f304 >> > > > > name:ZooKeeperConnection Watcher:localhost:9983 got event >> > WatchedEvent >> > > > > state:Disconnected type:None path:null path:null type:None >> > > > > >> > > > > INFO - 2016-05-13 23:18:48.504; [ ] >> > > > > org.apache.solr.common.cloud.ConnectionManager; zkClient has >> > > disconnected >> > > > > >> > > > > ERROR - 2016-05-13 23:18:51.316; [c:triple s:shard1 r:core_node1 >> > > > > x:triple_shard1_replica1] org.apache.solr.common.SolrException; >> > > > null:Early >> > > > > Client Disconnect >> > > > > >> > > > > WARN - 2016-05-13 23:18:51.431; [ ] >> > > > > org.apache.zookeeper.ClientCnxn$SendThread; Session >> 0x154ac66c81e0002 >> > > for >> > > > > server localhost/0:0:0:0:0:0:0:1:9983, unexpected error, closing >> > socket >> > > > > connection and attempting reconnect >> > > > > >> > > > > java.io.IOException: Connection reset by peer >> > > > > >> > > > > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) >> > > > > >> > > > > at >> sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) >> > > > > >> > > > > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) >> > > > > >> > > > > at sun.nio.ch.IOUtil.read(IOUtil.java:192) >> > > > > >> > > > > at >> > > sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) >> > > > > >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68) >> > > > > >> > > > > at >> > > > > >> > > > > >> > > > >> > > >> > >> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366) >> > > > > >> > > > > at >> > > > > >> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) >> > > > > >> > > > > On Fri, May 13, 2016 at 3:09 PM, Joel Bernstein < >> joels...@gmail.com> >> > > > > wrote: >> > > > > >> > > > > > A couple of other things: >> > > > > > >> > > > > > 1) Your innerJoin can parallelized across workers to improve >> > > > performance. >> > > > > > Take a look at the docs on the parallel function for the >> details. >> > > > > > >> > > > > > 2) It looks like you might be doing graph operations with joins. >> > You >> > > > > might >> > > > > > to take a look at the gatherNodes function coming in 6.1: >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=62693238 >> > > > > > >> > > > > > Joel Bernstein >> > > > > > http://joelsolr.blogspot.com/ >> > > > > > >> > > > > > On Fri, May 13, 2016 at 5:57 PM, Joel Bernstein < >> > joels...@gmail.com> >> > > > > > wrote: >> > > > > > >> > > > > > > When doing things that require all the results (like joins) >> you >> > > need >> > > > to >> > > > > > > specify the /export handler in the search function. >> > > > > > > >> > > > > > > qt="/export" >> > > > > > > >> > > > > > > The search function defaults to the /select handler which is >> > > designed >> > > > > to >> > > > > > > return the top N results. The /export handler always returns >> all >> > > > > results >> > > > > > > that match the query. Also keep in mind that the /export >> handler >> > > > > requires >> > > > > > > that sort fields and fl fields have docValues set. >> > > > > > > >> > > > > > > Joel Bernstein >> > > > > > > http://joelsolr.blogspot.com/ >> > > > > > > >> > > > > > > On Fri, May 13, 2016 at 5:36 PM, Ryan Cutter < >> > ryancut...@gmail.com >> > > > >> > > > > > wrote: >> > > > > > > >> > > > > > >> Question #1: >> > > > > > >> >> > > > > > >> triple_type collection has a few hundred docs and triple has >> 25M >> > > > docs. >> > > > > > >> >> > > > > > >> When I search for a particular subject_id in triple which I >> know >> > > has >> > > > > 14 >> > > > > > >> results and do not pass in 'rows' params, it returns 0 >> results: >> > > > > > >> >> > > > > > >> innerJoin( >> > > > > > >> search(triple, q=subject_id:1656521, >> > > > > > >> fl="triple_id,subject_id,type_id", >> > > > > > >> sort="type_id asc"), >> > > > > > >> search(triple_type, q=*:*, >> > > > fl="triple_type_id,triple_type_label", >> > > > > > >> sort="triple_type_id asc"), >> > > > > > >> on="type_id=triple_type_id" >> > > > > > >> ) >> > > > > > >> >> > > > > > >> When I do the same search with rows=10000, it returns 14 >> > results: >> > > > > > >> >> > > > > > >> innerJoin( >> > > > > > >> search(triple, q=subject_id:1656521, >> > > > > > >> fl="triple_id,subject_id,type_id", >> > > > > > >> sort="type_id asc", rows=10000), >> > > > > > >> search(triple_type, q=*:*, >> > > > fl="triple_type_id,triple_type_label", >> > > > > > >> sort="triple_type_id asc", rows=10000), >> > > > > > >> on="type_id=triple_type_id" >> > > > > > >> ) >> > > > > > >> >> > > > > > >> Am I doing this right? Is there a magic number to pass into >> > rows >> > > > > which >> > > > > > >> says "give me all the results which match this query"? >> > > > > > >> >> > > > > > >> >> > > > > > >> Question #2: >> > > > > > >> >> > > > > > >> Perhaps related to the first question but I want to run the >> > > > > innerJoin() >> > > > > > >> without the subject_id - rather have it use the results of >> > another >> > > > > > query. >> > > > > > >> But this does not return any results. I'm saying "search for >> > this >> > > > > > entity >> > > > > > >> based on id then use that result's entity_id as the >> subject_id >> > to >> > > > look >> > > > > > >> through the triple/triple_type collections: >> > > > > > >> >> > > > > > >> hashJoin( >> > > > > > >> innerJoin( >> > > > > > >> search(triple, q=*:*, >> fl="triple_id,subject_id,type_id", >> > > > > > >> sort="type_id asc"), >> > > > > > >> search(triple_type, q=*:*, >> > > > > > fl="triple_type_id,triple_type_label", >> > > > > > >> sort="triple_type_id asc"), >> > > > > > >> on="type_id=triple_type_id" >> > > > > > >> ), >> > > > > > >> hashed=search(entity, >> > > > > > >> >> q=id:"urn:sid:entity:455dfa1aa27eedad21ac2115797c1580bb3b3b4e", >> > > > > > >> fl="entity_id,entity_label", sort="entity_id asc"), >> > > > > > >> on="subject_id=entity_id" >> > > > > > >> ) >> > > > > > >> >> > > > > > >> Am I using doing this hashJoin right? >> > > > > > >> >> > > > > > >> Thanks very much, Ryan >> > > > > > >> >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >