Ah, you also used 4 shards. That means with 8 workers there were 32
concurrent queries against the /select handler each requesting 100,000
rows. That's a really heavy load!

You can still try out the approach from my last email on the 4 shards
setup, as you add workers gradually you'll gradually ramp up the
parallelism on the machine. With a single worker you'll have 4 shards
working in parallel. With 8 works you'll have 32 threads working parallel.

Joel Bernstein
http://joelsolr.blogspot.com/

On Sun, May 15, 2016 at 5:23 PM, Joel Bernstein <joels...@gmail.com> wrote:

> Hi Ryan,
>
> The rows=100000 on the /select handler is likely going to cause problems
> with 8 workers. This is calling the /select handler with 8 concurrent
> workers each retrieving 100,000 rows. The /select handler bogs down as the
> number of rows increases. So using the rows parameter with the /select
> handler is really not a strategy for limiting the size of the join. To
> limit the size of the join you would need to place some kind of filter on
> the query and still use the /export handler.
>
> The /export handler was developed to handle large exports and not get
> bogged down.
>
> You may want to start just getting an understanding of how much data a
> single node can export, and how long it takes.
>
> 1) Try running a single *:* search() using the /export handler on the
> triple collection. Time how long it takes. If you run into problems getting
> this to complete then attach a memory profiler. It may be that 8 gigs is
> not enough to hold the docValues in memory and process the query. The
> /export handler does not use more memory as the result set rises, so the
> /export handler should be able process the entire query (30,000,000 docs).
> But it does take a lot of memory to hold the docValues fields in memory.
> This query will likely take some time to complete though as you are sorting
> and exporting 30,000,000 million docs from a single node.
>
> 2) Then try running the same *:* search() against the /export handler in
> parallel() gradually increasing the number of workers. Time how long it
> takes as you add workers and watch the load it places on the server.
> Eventually you'll max out your performance.
>
>
> Then you'll start to get an idea of how fast a single node can sort and
> export data.
>
>
>
>
> Joel Bernstein
> http://joelsolr.blogspot.com/
>
> On Sat, May 14, 2016 at 4:14 PM, Ryan Cutter <ryancut...@gmail.com> wrote:
>
>> Hello, I'm running Solr on my laptop with -Xmx8g and gave each collection
>> 4
>> shards and 2 replicas.
>>
>> Even grabbing 100k triple documents (like the following) is taking 20
>> seconds to complete and prone to fall over.  I could try this in a proper
>> cluster with multiple hosts and more sharding, etc.  I just thought I was
>> tinkering with a small enough data set to use locally.
>>
>> parallel(
>>     triple,
>>     innerJoin(
>>       search(triple, q=*:*, fl="subject_id,type_id", sort="type_id asc",
>> partitionKeys="type_id", rows="100000"),
>>       search(triple_type, q=*:*, fl="triple_type_id", sort="triple_type_id
>> asc", partitionKeys="triple_type_id", qt="/export"),
>>       on="type_id=triple_type_id"
>>     ),
>>     sort="subject_id asc",
>>     workers="8")
>>
>>
>> When Solr does crash, it's leaving messages like this.
>>
>> ERROR - 2016-05-14 20:00:53.892; [c:triple s:shard3 r:core_node2
>> x:triple_shard3_replica2] org.apache.solr.common.SolrException;
>> null:java.io.IOException: java.util.concurrent.TimeoutException: Idle
>> timeout expired: 50001/50000 ms
>>
>> at
>>
>> org.eclipse.jetty.util.SharedBlockingCallback$Blocker.block(SharedBlockingCallback.java:226)
>>
>> at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:164)
>>
>> at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:530)
>>
>> at
>>
>> org.apache.solr.response.QueryResponseWriterUtil$1.write(QueryResponseWriterUtil.java:54)
>>
>> at java.io.OutputStream.write(OutputStream.java:116)
>>
>> at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
>>
>> at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
>>
>> at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
>>
>> at java.io.OutputStreamWriter.write(OutputStreamWriter.java:207)
>>
>> at org.apache.solr.util.FastWriter.flush(FastWriter.java:140)
>>
>> at org.apache.solr.util.FastWriter.write(FastWriter.java:54)
>>
>> at
>>
>> org.apache.solr.response.JSONWriter.writeMapCloser(JSONResponseWriter.java:420)
>>
>> at
>>
>> org.apache.solr.response.JSONWriter.writeSolrDocument(JSONResponseWriter.java:364)
>>
>> at
>>
>> org.apache.solr.response.TextResponseWriter.writeDocuments(TextResponseWriter.java:246)
>>
>> at
>>
>> org.apache.solr.response.TextResponseWriter.writeVal(TextResponseWriter.java:150)
>>
>> at
>>
>> org.apache.solr.response.JSONWriter.writeNamedListAsMapWithDups(JSONResponseWriter.java:183)
>>
>> On Fri, May 13, 2016 at 5:50 PM, Joel Bernstein <joels...@gmail.com>
>> wrote:
>>
>> > Also the hashJoin is going to read the entire entity table into memory.
>> If
>> > that's a large index that could be using lots of memory.
>> >
>> > 25 million docs should be ok to /export from one node, as long as you
>> have
>> > enough memory to load the docValues for the fields for sorting and
>> > exporting.
>> >
>> > Breaking down the query into it's parts will show where the issue is.
>> Also
>> > adding more heap might give you enough memory.
>> >
>> > In my testing the max docs per second I've seen the /export handler push
>> > from a single node is 650,000. In order to get 650,000 docs per second
>> on
>> > one node you have to partition the stream with workers. In my testing it
>> > took 8 workers hitting one node to achieve the 650,000 docs per second.
>> >
>> > But the numbers get big as the cluster grows. With 20 shards and 4
>> replicas
>> > and 32 workers, you could export 52,000,000 docs per-second. With 40
>> > shards, 5 replicas and 40 workers you could export 130,000,000 docs per
>> > second.
>> >
>> > So with large clusters you could do very large distributed joins with
>> > sub-second performance.
>> >
>> >
>> >
>> >
>> > Joel Bernstein
>> > http://joelsolr.blogspot.com/
>> >
>> > On Fri, May 13, 2016 at 8:11 PM, Ryan Cutter <ryancut...@gmail.com>
>> wrote:
>> >
>> > > Thanks very much for the advice.  Yes, I'm running in a very basic
>> single
>> > > shard environment.  I thought that 25M docs was small enough to not
>> > require
>> > > anything special but I will try scaling like you suggest and let you
>> know
>> > > what happens.
>> > >
>> > > Cheers, Ryan
>> > >
>> > > On Fri, May 13, 2016 at 4:53 PM, Joel Bernstein <joels...@gmail.com>
>> > > wrote:
>> > >
>> > > > I would try breaking down the second query to see when the problems
>> > > occur.
>> > > >
>> > > > 1) Start with just a single *:* search from one of the collections.
>> > > > 2) Then test the innerJoin. The innerJoin won't take much memory as
>> > it's
>> > > a
>> > > > streaming merge join.
>> > > > 3) Then try the full thing.
>> > > >
>> > > > If you're running a large join like this all on one host then you
>> might
>> > > not
>> > > > have enough memory for the docValues and the two joins. In general
>> > > > streaming is designed to scale by adding servers. It scales 3 ways:
>> > > >
>> > > > 1) Adding shards, splits up the index for more pushing power.
>> > > > 2) Adding workers, partitions the streams and splits up the join /
>> > merge
>> > > > work.
>> > > > 3) Adding replicas, when you have workers you will add pushing
>> power by
>> > > > adding replicas. This is because workers will fetch partitions of
>> the
>> > > > streams from across the entire cluster. So ALL replicas will be
>> pushing
>> > > at
>> > > > once.
>> > > >
>> > > > So, imagine a setup with 20 shards, 4 replicas, and 20 workers. You
>> can
>> > > > perform massive joins quickly.
>> > > >
>> > > > But for you're scenario and available hardware you can experiment
>> with
>> > > > different cluster sizes.
>> > > >
>> > > >
>> > > >
>> > > > Joel Bernstein
>> > > > http://joelsolr.blogspot.com/
>> > > >
>> > > > On Fri, May 13, 2016 at 7:27 PM, Ryan Cutter <ryancut...@gmail.com>
>> > > wrote:
>> > > >
>> > > > > qt="/export" immediately fixed the query in Question #1.  Sorry
>> for
>> > > > missing
>> > > > > that in the docs!
>> > > > >
>> > > > > The second query (with /export) crashes the server so I was going
>> to
>> > > look
>> > > > > at parallelization if you think that's a good idea.  It also seems
>> > > unwise
>> > > > > to joining into 26M docs so maybe I can reconfigure the query to
>> run
>> > > > along
>> > > > > a more happy path :-)  The schema is very RDBMS-centric so maybe
>> that
>> > > > just
>> > > > > won't ever work in this framework.
>> > > > >
>> > > > > Here's the log but it's not very helpful.
>> > > > >
>> > > > >
>> > > > > INFO  - 2016-05-13 23:18:13.214; [c:triple s:shard1 r:core_node1
>> > > > > x:triple_shard1_replica1] org.apache.solr.core.SolrCore;
>> > > > > [triple_shard1_replica1]  webapp=/solr path=/export
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> params={q=*:*&distrib=false&fl=triple_id,subject_id,type_id&sort=type_id+asc&wt=json&version=2.2}
>> > > > > hits=26305619 status=0 QTime=61
>> > > > >
>> > > > > INFO  - 2016-05-13 23:18:13.747; [c:triple_type s:shard1
>> r:core_node1
>> > > > > x:triple_type_shard1_replica1] org.apache.solr.core.SolrCore;
>> > > > > [triple_type_shard1_replica1]  webapp=/solr path=/export
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> params={q=*:*&distrib=false&fl=triple_type_id,triple_type_label&sort=triple_type_id+asc&wt=json&version=2.2}
>> > > > > hits=702 status=0 QTime=2
>> > > > >
>> > > > > INFO  - 2016-05-13 23:18:48.504; [   ]
>> > > > > org.apache.solr.common.cloud.ConnectionManager; Watcher
>> > > > > org.apache.solr.common.cloud.ConnectionManager@6ad0f304
>> > > > > name:ZooKeeperConnection Watcher:localhost:9983 got event
>> > WatchedEvent
>> > > > > state:Disconnected type:None path:null path:null type:None
>> > > > >
>> > > > > INFO  - 2016-05-13 23:18:48.504; [   ]
>> > > > > org.apache.solr.common.cloud.ConnectionManager; zkClient has
>> > > disconnected
>> > > > >
>> > > > > ERROR - 2016-05-13 23:18:51.316; [c:triple s:shard1 r:core_node1
>> > > > > x:triple_shard1_replica1] org.apache.solr.common.SolrException;
>> > > > null:Early
>> > > > > Client Disconnect
>> > > > >
>> > > > > WARN  - 2016-05-13 23:18:51.431; [   ]
>> > > > > org.apache.zookeeper.ClientCnxn$SendThread; Session
>> 0x154ac66c81e0002
>> > > for
>> > > > > server localhost/0:0:0:0:0:0:0:1:9983, unexpected error, closing
>> > socket
>> > > > > connection and attempting reconnect
>> > > > >
>> > > > > java.io.IOException: Connection reset by peer
>> > > > >
>> > > > >         at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>> > > > >
>> > > > >         at
>> sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>> > > > >
>> > > > >         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>> > > > >
>> > > > >         at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>> > > > >
>> > > > >         at
>> > > sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>> > > > >
>> > > > >         at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
>> > > > >
>> > > > >         at
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
>> > > > >
>> > > > >         at
>> > > > >
>> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
>> > > > >
>> > > > > On Fri, May 13, 2016 at 3:09 PM, Joel Bernstein <
>> joels...@gmail.com>
>> > > > > wrote:
>> > > > >
>> > > > > > A couple of other things:
>> > > > > >
>> > > > > > 1) Your innerJoin can parallelized across workers to improve
>> > > > performance.
>> > > > > > Take a look at the docs on the parallel function for the
>> details.
>> > > > > >
>> > > > > > 2) It looks like you might be doing graph operations with joins.
>> > You
>> > > > > might
>> > > > > > to take a look at the gatherNodes function coming in 6.1:
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=62693238
>> > > > > >
>> > > > > > Joel Bernstein
>> > > > > > http://joelsolr.blogspot.com/
>> > > > > >
>> > > > > > On Fri, May 13, 2016 at 5:57 PM, Joel Bernstein <
>> > joels...@gmail.com>
>> > > > > > wrote:
>> > > > > >
>> > > > > > > When doing things that require all the results (like joins)
>> you
>> > > need
>> > > > to
>> > > > > > > specify the /export handler in the search function.
>> > > > > > >
>> > > > > > > qt="/export"
>> > > > > > >
>> > > > > > > The search function defaults to the /select handler which is
>> > > designed
>> > > > > to
>> > > > > > > return the top N results. The /export handler always returns
>> all
>> > > > > results
>> > > > > > > that match the query. Also keep in mind that the /export
>> handler
>> > > > > requires
>> > > > > > > that sort fields and fl fields have docValues set.
>> > > > > > >
>> > > > > > > Joel Bernstein
>> > > > > > > http://joelsolr.blogspot.com/
>> > > > > > >
>> > > > > > > On Fri, May 13, 2016 at 5:36 PM, Ryan Cutter <
>> > ryancut...@gmail.com
>> > > >
>> > > > > > wrote:
>> > > > > > >
>> > > > > > >> Question #1:
>> > > > > > >>
>> > > > > > >> triple_type collection has a few hundred docs and triple has
>> 25M
>> > > > docs.
>> > > > > > >>
>> > > > > > >> When I search for a particular subject_id in triple which I
>> know
>> > > has
>> > > > > 14
>> > > > > > >> results and do not pass in 'rows' params, it returns 0
>> results:
>> > > > > > >>
>> > > > > > >> innerJoin(
>> > > > > > >>     search(triple, q=subject_id:1656521,
>> > > > > > >> fl="triple_id,subject_id,type_id",
>> > > > > > >> sort="type_id asc"),
>> > > > > > >>     search(triple_type, q=*:*,
>> > > > fl="triple_type_id,triple_type_label",
>> > > > > > >> sort="triple_type_id asc"),
>> > > > > > >>     on="type_id=triple_type_id"
>> > > > > > >> )
>> > > > > > >>
>> > > > > > >> When I do the same search with rows=10000, it returns 14
>> > results:
>> > > > > > >>
>> > > > > > >> innerJoin(
>> > > > > > >>     search(triple, q=subject_id:1656521,
>> > > > > > >> fl="triple_id,subject_id,type_id",
>> > > > > > >> sort="type_id asc", rows=10000),
>> > > > > > >>     search(triple_type, q=*:*,
>> > > > fl="triple_type_id,triple_type_label",
>> > > > > > >> sort="triple_type_id asc", rows=10000),
>> > > > > > >>     on="type_id=triple_type_id"
>> > > > > > >> )
>> > > > > > >>
>> > > > > > >> Am I doing this right?  Is there a magic number to pass into
>> > rows
>> > > > > which
>> > > > > > >> says "give me all the results which match this query"?
>> > > > > > >>
>> > > > > > >>
>> > > > > > >> Question #2:
>> > > > > > >>
>> > > > > > >> Perhaps related to the first question but I want to run the
>> > > > > innerJoin()
>> > > > > > >> without the subject_id - rather have it use the results of
>> > another
>> > > > > > query.
>> > > > > > >> But this does not return any results.  I'm saying "search for
>> > this
>> > > > > > entity
>> > > > > > >> based on id then use that result's entity_id as the
>> subject_id
>> > to
>> > > > look
>> > > > > > >> through the triple/triple_type collections:
>> > > > > > >>
>> > > > > > >> hashJoin(
>> > > > > > >>     innerJoin(
>> > > > > > >>         search(triple, q=*:*,
>> fl="triple_id,subject_id,type_id",
>> > > > > > >> sort="type_id asc"),
>> > > > > > >>         search(triple_type, q=*:*,
>> > > > > > fl="triple_type_id,triple_type_label",
>> > > > > > >> sort="triple_type_id asc"),
>> > > > > > >>         on="type_id=triple_type_id"
>> > > > > > >>     ),
>> > > > > > >>     hashed=search(entity,
>> > > > > > >>
>> q=id:"urn:sid:entity:455dfa1aa27eedad21ac2115797c1580bb3b3b4e",
>> > > > > > >> fl="entity_id,entity_label", sort="entity_id asc"),
>> > > > > > >>     on="subject_id=entity_id"
>> > > > > > >> )
>> > > > > > >>
>> > > > > > >> Am I using doing this hashJoin right?
>> > > > > > >>
>> > > > > > >> Thanks very much, Ryan
>> > > > > > >>
>> > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to