One other thing to keep in is how the partitioning is done when you add the
partitionKeys.

Partitioning is done using the HashQParserPlugin, which builds a filter for
each worker. Under the covers this is using the normal filter query
mechanism. So after the filters are built and cached they are effectively
free from a performance standpoint. But on the first run they need to be
built and they need to be rebuilt after each commit. These means several
things:

1) If you have 8 workers then 8 filters need to be computed. The workers
call down to the shards in parallel so the filters will build in parallel.
But this can take time and the larger the index, the more time it takes.

2) Like all filters, the partitioning filters can be pre-computed using
warming queries. You can check the logs and look for the {!hash ...} filter
queries to see the syntax. But basically you would need a warming query for
each worker ID.

3) If you don't pre-warm the partitioning filters then there will be a
performance penalty the first time they are computed. The next query will
be much faster.

4) This is another area where having more shards helps with performance,
because having fewer documents per shard, means faster times building the
partition filters.

In the future we'll switch to segment level partitioning filters, so that
following each commit only the new segments need to be built. But this is
still on the TODO list.


Joel Bernstein
http://joelsolr.blogspot.com/

On Sun, May 15, 2016 at 5:38 PM, Joel Bernstein <joels...@gmail.com> wrote:

> Ah, you also used 4 shards. That means with 8 workers there were 32
> concurrent queries against the /select handler each requesting 100,000
> rows. That's a really heavy load!
>
> You can still try out the approach from my last email on the 4 shards
> setup, as you add workers gradually you'll gradually ramp up the
> parallelism on the machine. With a single worker you'll have 4 shards
> working in parallel. With 8 works you'll have 32 threads working parallel.
>
> Joel Bernstein
> http://joelsolr.blogspot.com/
>
> On Sun, May 15, 2016 at 5:23 PM, Joel Bernstein <joels...@gmail.com>
> wrote:
>
>> Hi Ryan,
>>
>> The rows=100000 on the /select handler is likely going to cause problems
>> with 8 workers. This is calling the /select handler with 8 concurrent
>> workers each retrieving 100,000 rows. The /select handler bogs down as the
>> number of rows increases. So using the rows parameter with the /select
>> handler is really not a strategy for limiting the size of the join. To
>> limit the size of the join you would need to place some kind of filter on
>> the query and still use the /export handler.
>>
>> The /export handler was developed to handle large exports and not get
>> bogged down.
>>
>> You may want to start just getting an understanding of how much data a
>> single node can export, and how long it takes.
>>
>> 1) Try running a single *:* search() using the /export handler on the
>> triple collection. Time how long it takes. If you run into problems getting
>> this to complete then attach a memory profiler. It may be that 8 gigs is
>> not enough to hold the docValues in memory and process the query. The
>> /export handler does not use more memory as the result set rises, so the
>> /export handler should be able process the entire query (30,000,000 docs).
>> But it does take a lot of memory to hold the docValues fields in memory.
>> This query will likely take some time to complete though as you are sorting
>> and exporting 30,000,000 million docs from a single node.
>>
>> 2) Then try running the same *:* search() against the /export handler in
>> parallel() gradually increasing the number of workers. Time how long it
>> takes as you add workers and watch the load it places on the server.
>> Eventually you'll max out your performance.
>>
>>
>> Then you'll start to get an idea of how fast a single node can sort and
>> export data.
>>
>>
>>
>>
>> Joel Bernstein
>> http://joelsolr.blogspot.com/
>>
>> On Sat, May 14, 2016 at 4:14 PM, Ryan Cutter <ryancut...@gmail.com>
>> wrote:
>>
>>> Hello, I'm running Solr on my laptop with -Xmx8g and gave each
>>> collection 4
>>> shards and 2 replicas.
>>>
>>> Even grabbing 100k triple documents (like the following) is taking 20
>>> seconds to complete and prone to fall over.  I could try this in a proper
>>> cluster with multiple hosts and more sharding, etc.  I just thought I was
>>> tinkering with a small enough data set to use locally.
>>>
>>> parallel(
>>>     triple,
>>>     innerJoin(
>>>       search(triple, q=*:*, fl="subject_id,type_id", sort="type_id asc",
>>> partitionKeys="type_id", rows="100000"),
>>>       search(triple_type, q=*:*, fl="triple_type_id",
>>> sort="triple_type_id
>>> asc", partitionKeys="triple_type_id", qt="/export"),
>>>       on="type_id=triple_type_id"
>>>     ),
>>>     sort="subject_id asc",
>>>     workers="8")
>>>
>>>
>>> When Solr does crash, it's leaving messages like this.
>>>
>>> ERROR - 2016-05-14 20:00:53.892; [c:triple s:shard3 r:core_node2
>>> x:triple_shard3_replica2] org.apache.solr.common.SolrException;
>>> null:java.io.IOException: java.util.concurrent.TimeoutException: Idle
>>> timeout expired: 50001/50000 ms
>>>
>>> at
>>>
>>> org.eclipse.jetty.util.SharedBlockingCallback$Blocker.block(SharedBlockingCallback.java:226)
>>>
>>> at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:164)
>>>
>>> at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:530)
>>>
>>> at
>>>
>>> org.apache.solr.response.QueryResponseWriterUtil$1.write(QueryResponseWriterUtil.java:54)
>>>
>>> at java.io.OutputStream.write(OutputStream.java:116)
>>>
>>> at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
>>>
>>> at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
>>>
>>> at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
>>>
>>> at java.io.OutputStreamWriter.write(OutputStreamWriter.java:207)
>>>
>>> at org.apache.solr.util.FastWriter.flush(FastWriter.java:140)
>>>
>>> at org.apache.solr.util.FastWriter.write(FastWriter.java:54)
>>>
>>> at
>>>
>>> org.apache.solr.response.JSONWriter.writeMapCloser(JSONResponseWriter.java:420)
>>>
>>> at
>>>
>>> org.apache.solr.response.JSONWriter.writeSolrDocument(JSONResponseWriter.java:364)
>>>
>>> at
>>>
>>> org.apache.solr.response.TextResponseWriter.writeDocuments(TextResponseWriter.java:246)
>>>
>>> at
>>>
>>> org.apache.solr.response.TextResponseWriter.writeVal(TextResponseWriter.java:150)
>>>
>>> at
>>>
>>> org.apache.solr.response.JSONWriter.writeNamedListAsMapWithDups(JSONResponseWriter.java:183)
>>>
>>> On Fri, May 13, 2016 at 5:50 PM, Joel Bernstein <joels...@gmail.com>
>>> wrote:
>>>
>>> > Also the hashJoin is going to read the entire entity table into
>>> memory. If
>>> > that's a large index that could be using lots of memory.
>>> >
>>> > 25 million docs should be ok to /export from one node, as long as you
>>> have
>>> > enough memory to load the docValues for the fields for sorting and
>>> > exporting.
>>> >
>>> > Breaking down the query into it's parts will show where the issue is.
>>> Also
>>> > adding more heap might give you enough memory.
>>> >
>>> > In my testing the max docs per second I've seen the /export handler
>>> push
>>> > from a single node is 650,000. In order to get 650,000 docs per second
>>> on
>>> > one node you have to partition the stream with workers. In my testing
>>> it
>>> > took 8 workers hitting one node to achieve the 650,000 docs per second.
>>> >
>>> > But the numbers get big as the cluster grows. With 20 shards and 4
>>> replicas
>>> > and 32 workers, you could export 52,000,000 docs per-second. With 40
>>> > shards, 5 replicas and 40 workers you could export 130,000,000 docs per
>>> > second.
>>> >
>>> > So with large clusters you could do very large distributed joins with
>>> > sub-second performance.
>>> >
>>> >
>>> >
>>> >
>>> > Joel Bernstein
>>> > http://joelsolr.blogspot.com/
>>> >
>>> > On Fri, May 13, 2016 at 8:11 PM, Ryan Cutter <ryancut...@gmail.com>
>>> wrote:
>>> >
>>> > > Thanks very much for the advice.  Yes, I'm running in a very basic
>>> single
>>> > > shard environment.  I thought that 25M docs was small enough to not
>>> > require
>>> > > anything special but I will try scaling like you suggest and let you
>>> know
>>> > > what happens.
>>> > >
>>> > > Cheers, Ryan
>>> > >
>>> > > On Fri, May 13, 2016 at 4:53 PM, Joel Bernstein <joels...@gmail.com>
>>> > > wrote:
>>> > >
>>> > > > I would try breaking down the second query to see when the problems
>>> > > occur.
>>> > > >
>>> > > > 1) Start with just a single *:* search from one of the collections.
>>> > > > 2) Then test the innerJoin. The innerJoin won't take much memory as
>>> > it's
>>> > > a
>>> > > > streaming merge join.
>>> > > > 3) Then try the full thing.
>>> > > >
>>> > > > If you're running a large join like this all on one host then you
>>> might
>>> > > not
>>> > > > have enough memory for the docValues and the two joins. In general
>>> > > > streaming is designed to scale by adding servers. It scales 3 ways:
>>> > > >
>>> > > > 1) Adding shards, splits up the index for more pushing power.
>>> > > > 2) Adding workers, partitions the streams and splits up the join /
>>> > merge
>>> > > > work.
>>> > > > 3) Adding replicas, when you have workers you will add pushing
>>> power by
>>> > > > adding replicas. This is because workers will fetch partitions of
>>> the
>>> > > > streams from across the entire cluster. So ALL replicas will be
>>> pushing
>>> > > at
>>> > > > once.
>>> > > >
>>> > > > So, imagine a setup with 20 shards, 4 replicas, and 20 workers.
>>> You can
>>> > > > perform massive joins quickly.
>>> > > >
>>> > > > But for you're scenario and available hardware you can experiment
>>> with
>>> > > > different cluster sizes.
>>> > > >
>>> > > >
>>> > > >
>>> > > > Joel Bernstein
>>> > > > http://joelsolr.blogspot.com/
>>> > > >
>>> > > > On Fri, May 13, 2016 at 7:27 PM, Ryan Cutter <ryancut...@gmail.com
>>> >
>>> > > wrote:
>>> > > >
>>> > > > > qt="/export" immediately fixed the query in Question #1.  Sorry
>>> for
>>> > > > missing
>>> > > > > that in the docs!
>>> > > > >
>>> > > > > The second query (with /export) crashes the server so I was
>>> going to
>>> > > look
>>> > > > > at parallelization if you think that's a good idea.  It also
>>> seems
>>> > > unwise
>>> > > > > to joining into 26M docs so maybe I can reconfigure the query to
>>> run
>>> > > > along
>>> > > > > a more happy path :-)  The schema is very RDBMS-centric so maybe
>>> that
>>> > > > just
>>> > > > > won't ever work in this framework.
>>> > > > >
>>> > > > > Here's the log but it's not very helpful.
>>> > > > >
>>> > > > >
>>> > > > > INFO  - 2016-05-13 23:18:13.214; [c:triple s:shard1 r:core_node1
>>> > > > > x:triple_shard1_replica1] org.apache.solr.core.SolrCore;
>>> > > > > [triple_shard1_replica1]  webapp=/solr path=/export
>>> > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> params={q=*:*&distrib=false&fl=triple_id,subject_id,type_id&sort=type_id+asc&wt=json&version=2.2}
>>> > > > > hits=26305619 status=0 QTime=61
>>> > > > >
>>> > > > > INFO  - 2016-05-13 23:18:13.747; [c:triple_type s:shard1
>>> r:core_node1
>>> > > > > x:triple_type_shard1_replica1] org.apache.solr.core.SolrCore;
>>> > > > > [triple_type_shard1_replica1]  webapp=/solr path=/export
>>> > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> params={q=*:*&distrib=false&fl=triple_type_id,triple_type_label&sort=triple_type_id+asc&wt=json&version=2.2}
>>> > > > > hits=702 status=0 QTime=2
>>> > > > >
>>> > > > > INFO  - 2016-05-13 23:18:48.504; [   ]
>>> > > > > org.apache.solr.common.cloud.ConnectionManager; Watcher
>>> > > > > org.apache.solr.common.cloud.ConnectionManager@6ad0f304
>>> > > > > name:ZooKeeperConnection Watcher:localhost:9983 got event
>>> > WatchedEvent
>>> > > > > state:Disconnected type:None path:null path:null type:None
>>> > > > >
>>> > > > > INFO  - 2016-05-13 23:18:48.504; [   ]
>>> > > > > org.apache.solr.common.cloud.ConnectionManager; zkClient has
>>> > > disconnected
>>> > > > >
>>> > > > > ERROR - 2016-05-13 23:18:51.316; [c:triple s:shard1 r:core_node1
>>> > > > > x:triple_shard1_replica1] org.apache.solr.common.SolrException;
>>> > > > null:Early
>>> > > > > Client Disconnect
>>> > > > >
>>> > > > > WARN  - 2016-05-13 23:18:51.431; [   ]
>>> > > > > org.apache.zookeeper.ClientCnxn$SendThread; Session
>>> 0x154ac66c81e0002
>>> > > for
>>> > > > > server localhost/0:0:0:0:0:0:0:1:9983, unexpected error, closing
>>> > socket
>>> > > > > connection and attempting reconnect
>>> > > > >
>>> > > > > java.io.IOException: Connection reset by peer
>>> > > > >
>>> > > > >         at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>>> > > > >
>>> > > > >         at
>>> sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>>> > > > >
>>> > > > >         at
>>> sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>>> > > > >
>>> > > > >         at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>>> > > > >
>>> > > > >         at
>>> > > sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>>> > > > >
>>> > > > >         at
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
>>> > > > >
>>> > > > >         at
>>> > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
>>> > > > >
>>> > > > >         at
>>> > > > >
>>> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
>>> > > > >
>>> > > > > On Fri, May 13, 2016 at 3:09 PM, Joel Bernstein <
>>> joels...@gmail.com>
>>> > > > > wrote:
>>> > > > >
>>> > > > > > A couple of other things:
>>> > > > > >
>>> > > > > > 1) Your innerJoin can parallelized across workers to improve
>>> > > > performance.
>>> > > > > > Take a look at the docs on the parallel function for the
>>> details.
>>> > > > > >
>>> > > > > > 2) It looks like you might be doing graph operations with
>>> joins.
>>> > You
>>> > > > > might
>>> > > > > > to take a look at the gatherNodes function coming in 6.1:
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=62693238
>>> > > > > >
>>> > > > > > Joel Bernstein
>>> > > > > > http://joelsolr.blogspot.com/
>>> > > > > >
>>> > > > > > On Fri, May 13, 2016 at 5:57 PM, Joel Bernstein <
>>> > joels...@gmail.com>
>>> > > > > > wrote:
>>> > > > > >
>>> > > > > > > When doing things that require all the results (like joins)
>>> you
>>> > > need
>>> > > > to
>>> > > > > > > specify the /export handler in the search function.
>>> > > > > > >
>>> > > > > > > qt="/export"
>>> > > > > > >
>>> > > > > > > The search function defaults to the /select handler which is
>>> > > designed
>>> > > > > to
>>> > > > > > > return the top N results. The /export handler always returns
>>> all
>>> > > > > results
>>> > > > > > > that match the query. Also keep in mind that the /export
>>> handler
>>> > > > > requires
>>> > > > > > > that sort fields and fl fields have docValues set.
>>> > > > > > >
>>> > > > > > > Joel Bernstein
>>> > > > > > > http://joelsolr.blogspot.com/
>>> > > > > > >
>>> > > > > > > On Fri, May 13, 2016 at 5:36 PM, Ryan Cutter <
>>> > ryancut...@gmail.com
>>> > > >
>>> > > > > > wrote:
>>> > > > > > >
>>> > > > > > >> Question #1:
>>> > > > > > >>
>>> > > > > > >> triple_type collection has a few hundred docs and triple
>>> has 25M
>>> > > > docs.
>>> > > > > > >>
>>> > > > > > >> When I search for a particular subject_id in triple which I
>>> know
>>> > > has
>>> > > > > 14
>>> > > > > > >> results and do not pass in 'rows' params, it returns 0
>>> results:
>>> > > > > > >>
>>> > > > > > >> innerJoin(
>>> > > > > > >>     search(triple, q=subject_id:1656521,
>>> > > > > > >> fl="triple_id,subject_id,type_id",
>>> > > > > > >> sort="type_id asc"),
>>> > > > > > >>     search(triple_type, q=*:*,
>>> > > > fl="triple_type_id,triple_type_label",
>>> > > > > > >> sort="triple_type_id asc"),
>>> > > > > > >>     on="type_id=triple_type_id"
>>> > > > > > >> )
>>> > > > > > >>
>>> > > > > > >> When I do the same search with rows=10000, it returns 14
>>> > results:
>>> > > > > > >>
>>> > > > > > >> innerJoin(
>>> > > > > > >>     search(triple, q=subject_id:1656521,
>>> > > > > > >> fl="triple_id,subject_id,type_id",
>>> > > > > > >> sort="type_id asc", rows=10000),
>>> > > > > > >>     search(triple_type, q=*:*,
>>> > > > fl="triple_type_id,triple_type_label",
>>> > > > > > >> sort="triple_type_id asc", rows=10000),
>>> > > > > > >>     on="type_id=triple_type_id"
>>> > > > > > >> )
>>> > > > > > >>
>>> > > > > > >> Am I doing this right?  Is there a magic number to pass into
>>> > rows
>>> > > > > which
>>> > > > > > >> says "give me all the results which match this query"?
>>> > > > > > >>
>>> > > > > > >>
>>> > > > > > >> Question #2:
>>> > > > > > >>
>>> > > > > > >> Perhaps related to the first question but I want to run the
>>> > > > > innerJoin()
>>> > > > > > >> without the subject_id - rather have it use the results of
>>> > another
>>> > > > > > query.
>>> > > > > > >> But this does not return any results.  I'm saying "search
>>> for
>>> > this
>>> > > > > > entity
>>> > > > > > >> based on id then use that result's entity_id as the
>>> subject_id
>>> > to
>>> > > > look
>>> > > > > > >> through the triple/triple_type collections:
>>> > > > > > >>
>>> > > > > > >> hashJoin(
>>> > > > > > >>     innerJoin(
>>> > > > > > >>         search(triple, q=*:*,
>>> fl="triple_id,subject_id,type_id",
>>> > > > > > >> sort="type_id asc"),
>>> > > > > > >>         search(triple_type, q=*:*,
>>> > > > > > fl="triple_type_id,triple_type_label",
>>> > > > > > >> sort="triple_type_id asc"),
>>> > > > > > >>         on="type_id=triple_type_id"
>>> > > > > > >>     ),
>>> > > > > > >>     hashed=search(entity,
>>> > > > > > >>
>>> q=id:"urn:sid:entity:455dfa1aa27eedad21ac2115797c1580bb3b3b4e",
>>> > > > > > >> fl="entity_id,entity_label", sort="entity_id asc"),
>>> > > > > > >>     on="subject_id=entity_id"
>>> > > > > > >> )
>>> > > > > > >>
>>> > > > > > >> Am I using doing this hashJoin right?
>>> > > > > > >>
>>> > > > > > >> Thanks very much, Ryan
>>> > > > > > >>
>>> > > > > > >
>>> > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>
>>
>

Reply via email to