Susmit,

You could wrap a LimitStream around the outside of all the relational
algebra. For example:

parallel(limit((intersect(intersect(search, search), union(search,
search)))))

In this scenario the limit would happen on the workers.

As far as the worker/replica ratio. This will depend on how heavy the
export is. If it's a light export, small number of fields, mostly numeric,
simple sort params, then I've seen a ratio of 5 (workers) to 1 (replica)
work well. This will basically saturate the CPU on the replica. But heavier
exports will saturate the replicas with fewer workers.

Also I tend to use Direct DocValues to get the best performance. I'm not
sure how much difference this makes, but it should eliminate the
compression overhead fetching the data from the DocValues.

Varun's suggestion of using the binary transport will provide a nice
performance increase as well. But you'll need to upgrade. You may need to
do that anyway as the fix on the early stream close will be on a later
version that was refactored to support the binary transport.

Joel Bernstein
http://joelsolr.blogspot.com/

On Tue, May 16, 2017 at 8:03 PM, Joel Bernstein <joels...@gmail.com> wrote:

> Yep, saw it. I'll comment on the ticket for what I believe needs to be
> done.
>
> Joel Bernstein
> http://joelsolr.blogspot.com/
>
> On Tue, May 16, 2017 at 8:00 PM, Varun Thacker <va...@vthacker.in> wrote:
>
>> Hi Joel,Susmit
>>
>> I created https://issues.apache.org/jira/browse/SOLR-10698 to track the
>> issue
>>
>> @Susmit looking at the stack trace I see the expression is using
>> JSONTupleStream
>> . I wonder if you tried using JavabinTupleStreamParser could it help
>> improve performance ?
>>
>> On Tue, May 16, 2017 at 9:39 AM, Susmit Shukla <shukla.sus...@gmail.com>
>> wrote:
>>
>> > Hi Joel,
>> >
>> > queries can be arbitrarily nested with AND/OR/NOT joins e.g.
>> >
>> > (intersect(intersect(search, search), union(search, search))). If I cut
>> off
>> > the innermost stream with a limit, the complete intersection would not
>> > happen at upper levels. Also would the limit stream have same effect as
>> > using /select handler with rows parameter?
>> > I am trying to force input stream close through reflection, just to see
>> if
>> > it gives performance gains.
>> >
>> > 2) would experiment with null streams. Is workers = number of replicas
>> in
>> > data collection a good thumb rule? is parallelstream performance upper
>> > bounded by number of replicas?
>> >
>> > Thanks,
>> > Susmit
>> >
>> > On Tue, May 16, 2017 at 5:59 AM, Joel Bernstein <joels...@gmail.com>
>> > wrote:
>> >
>> > > Your approach looks OK. The single sharded worker collection is only
>> > needed
>> > > if you were using CloudSolrStream to send the initial Streaming
>> > Expression
>> > > to the /stream handler. You are not doing this, so you're approach is
>> > fine.
>> > >
>> > > Here are some thoughts on what you described:
>> > >
>> > > 1) If you are closing the parallel stream after the top 1000 results,
>> > then
>> > > try wrapping the intersect in a LimitStream. This stream doesn't exist
>> > yet
>> > > so it will be a custom stream. The LimitStream can return the EOF
>> tuple
>> > > after it reads N tuples. This will cause the worker nodes to close the
>> > > underlying stream and cause the Broken Pipe exception to occur at the
>> > > /export handler, which will stop the /export.
>> > >
>> > > Here is the basic approach:
>> > >
>> > > parallel(limit(intersect(search, search)))
>> > >
>> > >
>> > > 2) It can be tricky to understand where the bottleneck lies when using
>> > the
>> > > ParallelStream for parallel relational algebra. You can use the
>> > NullStream
>> > > to get an understanding of why performance is not increasing when you
>> > > increase the workers. Here is the basic approach:
>> > >
>> > > parallel(null(intersect(search, search)))
>> > >
>> > > The NullStream will eat all the tuples on the workers and return a
>> single
>> > > tuple with the tuple count and the time taken to run the expression.
>> So
>> > > you'll get one tuple from each worker. This will eliminate any
>> bottleneck
>> > > on tuples returning through the ParallelStream and you can focus on
>> the
>> > > performance of the intersect and the /export handler.
>> > >
>> > > Then experiment with:
>> > >
>> > > 1) Increasing the number of parallel workers.
>> > > 2) Increasing the number of replicas in the data collections.
>> > >
>> > > And watch the timing information coming back from the NullStream
>> tuples.
>> > If
>> > > increasing the workers is not improving performance then the
>> bottleneck
>> > may
>> > > be in the /export handler. So try increasing replicas and see if that
>> > > improves performance. Different partitions of the streams will be
>> served
>> > by
>> > > different replicas.
>> > >
>> > > If performance doesn't improve with the NullStream after increasing
>> both
>> > > workers and replicas then we know the bottleneck is the network.
>> > >
>> > > Joel Bernstein
>> > > http://joelsolr.blogspot.com/
>> > >
>> > > On Mon, May 15, 2017 at 10:37 PM, Susmit Shukla <
>> shukla.sus...@gmail.com
>> > >
>> > > wrote:
>> > >
>> > > > Hi Joel,
>> > > >
>> > > > Regarding the implementation, I am wrapping the topmost TupleStream
>> in
>> > a
>> > > > ParallelStream and execute it on the worker cluster (one of the
>> joined
>> > > > cluster doubles up as worker cluster). ParallelStream does submit
>> the
>> > > query
>> > > > to /stream handler.
>> > > > for #2, for e.g. I am creating 2 CloudSolrStreams , wrapping them in
>> > > > IntersectStream and wrapping that in ParallelStream and reading out
>> the
>> > > > tuples from parallel stream. close() is called on parallelStream. I
>> do
>> > > have
>> > > > custom streams but that is similar to intersectStream.
>> > > > I am on solr 6.3.1
>> > > > The 2 solr clusters serving the join queries are having many shards.
>> > > Worker
>> > > > collection is also multi sharded and is one from the main clusters,
>> so
>> > do
>> > > > you imply I should be using a single sharded "worker" collection?
>> Would
>> > > the
>> > > > joins execute faster?
>> > > > On a side note, increasing the workers beyond 1 was not improving
>> the
>> > > > execution times but was degrading if number was 3 and above. That is
>> > > > counter intuitive since the joins are huge and putting more workers
>> > > should
>> > > > have improved the performance.
>> > > >
>> > > > Thanks,
>> > > > Susmit
>> > > >
>> > > >
>> > > > On Mon, May 15, 2017 at 6:47 AM, Joel Bernstein <joels...@gmail.com
>> >
>> > > > wrote:
>> > > >
>> > > > > Ok please do report any issues you run into. This is quite a good
>> bug
>> > > > > report.
>> > > > >
>> > > > > I reviewed the code and I believe I see the problem. The problem
>> > seems
>> > > to
>> > > > > be that output code from the /stream handler is not properly
>> > accounting
>> > > > for
>> > > > > client disconnects and closing the underlying stream. What I see
>> in
>> > the
>> > > > > code is that exceptions coming from read() in the stream do
>> > > automatically
>> > > > > close the underlying stream. But exceptions from the writing of
>> the
>> > > > stream
>> > > > > do not close the stream. This needs to be fixed.
>> > > > >
>> > > > > A few questions about your streaming implementation:
>> > > > >
>> > > > > 1) Are you sending requests to the /stream handler? Or are you
>> > > embedding
>> > > > > CloudSolrStream in your application and bypassing the /stream
>> > handler?
>> > > > >
>> > > > > 2) If you're sending Streaming Expressions to the stream handler
>> are
>> > > you
>> > > > > using SolrStream or CloudSolrStream to send the expression?
>> > > > >
>> > > > > 3) What version of Solr are you using.
>> > > > >
>> > > > > 4) Have you implemented any custom streams?
>> > > > >
>> > > > >
>> > > > > #2 is an important question. If you're sending expressions to the
>> > > /stream
>> > > > > handler using CloudSolrStream the collection running the
>> expression
>> > > would
>> > > > > have to be setup a specific way. The collection running the
>> > expression
>> > > > will
>> > > > > have to be a* single shard collection*. You can have as many
>> replicas
>> > > as
>> > > > > you want but only one shard. That's because CloudSolrStream picks
>> one
>> > > > > replica in each shard to forward the request to then merges the
>> > results
>> > > > > from the shards. So if you send in an expression using
>> > CloudSolrStream
>> > > > that
>> > > > > expression will be sent to each shard to be run and each shard
>> will
>> > be
>> > > > > duplicating the work and return duplicate results.
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > > Joel Bernstein
>> > > > > http://joelsolr.blogspot.com/
>> > > > >
>> > > > > On Sat, May 13, 2017 at 7:03 PM, Susmit Shukla <
>> > > shukla.sus...@gmail.com>
>> > > > > wrote:
>> > > > >
>> > > > > > Thanks Joel
>> > > > > > Streaming is awesome, just had a huge implementation in my
>> > project. I
>> > > > > found
>> > > > > > out a couple more issues with streaming and did local hacks for
>> > them,
>> > > > > would
>> > > > > > raise them too.
>> > > > > >
>> > > > > > On Sat, May 13, 2017 at 2:09 PM, Joel Bernstein <
>> > joels...@gmail.com>
>> > > > > > wrote:
>> > > > > >
>> > > > > > > Ah, then this is unexpected behavior. Can you open a ticket
>> for
>> > > this?
>> > > > > > >
>> > > > > > > Joel Bernstein
>> > > > > > > http://joelsolr.blogspot.com/
>> > > > > > >
>> > > > > > > On Sat, May 13, 2017 at 2:51 PM, Susmit Shukla <
>> > > > > shukla.sus...@gmail.com>
>> > > > > > > wrote:
>> > > > > > >
>> > > > > > > > Hi Joel,
>> > > > > > > >
>> > > > > > > > I was using CloudSolrStream for the above test. Below is the
>> > call
>> > > > > > stack.
>> > > > > > > >
>> > > > > > > > at
>> > > > > > > > org.apache.http.impl.io.ChunkedInputStream.read(
>> > > > > > > > ChunkedInputStream.java:215)
>> > > > > > > > at
>> > > > > > > > org.apache.http.impl.io.ChunkedInputStream.close(
>> > > > > > > > ChunkedInputStream.java:316)
>> > > > > > > > at
>> > > > > > > > org.apache.http.impl.execchain.ResponseEntityProxy.
>> > streamClosed(
>> > > > > > > > ResponseEntityProxy.java:128)
>> > > > > > > > at
>> > > > > > > > org.apache.http.conn.EofSensorInputStream.checkClose(
>> > > > > > > > EofSensorInputStream.java:228)
>> > > > > > > > at
>> > > > > > > > org.apache.http.conn.EofSensorInputStream.close(
>> > > > > > > > EofSensorInputStream.java:174)
>> > > > > > > > at sun.nio.cs.StreamDecoder.implC
>> lose(StreamDecoder.java:378)
>> > > > > > > > at sun.nio.cs.StreamDecoder.close(StreamDecoder.java:193)
>> > > > > > > > at java.io.InputStreamReader.clos
>> e(InputStreamReader.java:199)
>> > > > > > > > at
>> > > > > > > > org.apache.solr.client.solrj.io.stream.JSONTupleStream.
>> > > > > > > > close(JSONTupleStream.java:91)
>> > > > > > > > at
>> > > > > > > > org.apache.solr.client.solrj.io.stream.SolrStream.close(
>> > > > > > > > SolrStream.java:186)
>> > > > > > > >
>> > > > > > > > Thanks,
>> > > > > > > > Susmit
>> > > > > > > >
>> > > > > > > > On Sat, May 13, 2017 at 10:48 AM, Joel Bernstein <
>> > > > joels...@gmail.com
>> > > > > >
>> > > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > I was just reading the Java docs on the
>> ChunkedInputStream.
>> > > > > > > > >
>> > > > > > > > > "Note that this class NEVER closes the underlying stream"
>> > > > > > > > >
>> > > > > > > > > In that scenario the /export would indeed continue to send
>> > > data.
>> > > > I
>> > > > > > > think
>> > > > > > > > we
>> > > > > > > > > can consider this an anti-pattern for the /export handler
>> > > > > currently.
>> > > > > > > > >
>> > > > > > > > > I would suggest using one of the Streaming Clients to
>> connect
>> > > to
>> > > > > the
>> > > > > > > > export
>> > > > > > > > > handler. Either CloudSolrStream or SolrStream will both
>> > > interact
>> > > > > with
>> > > > > > > the
>> > > > > > > > > /export handler in a the way that it expects.
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > Joel Bernstein
>> > > > > > > > > http://joelsolr.blogspot.com/
>> > > > > > > > >
>> > > > > > > > > On Sat, May 13, 2017 at 12:28 PM, Susmit Shukla <
>> > > > > > > shukla.sus...@gmail.com
>> > > > > > > > >
>> > > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Hi Joel,
>> > > > > > > > > >
>> > > > > > > > > > I did not observe that. On calling close() on stream, it
>> > > cycled
>> > > > > > > through
>> > > > > > > > > all
>> > > > > > > > > > the hits that /export handler calculated.
>> > > > > > > > > > e.g. with a *:* query and export handler on a 100k
>> document
>> > > > > index,
>> > > > > > I
>> > > > > > > > > could
>> > > > > > > > > > see the 100kth record printed on the http wire debug log
>> > > > although
>> > > > > > > close
>> > > > > > > > > was
>> > > > > > > > > > called after reading 1st tuple. The time taken for the
>> > > > operation
>> > > > > > with
>> > > > > > > > > > close() call was same as that if I had read all the 100k
>> > > > tuples.
>> > > > > > > > > > As I have pointed out, close() on underlying
>> > > ChunkedInputStream
>> > > > > > calls
>> > > > > > > > > > read() and solr server has probably no way to
>> distinguish
>> > it
>> > > > from
>> > > > > > > > read()
>> > > > > > > > > > happening from regular tuple reads..
>> > > > > > > > > > I think there should be an abort() API for solr streams
>> > that
>> > > > > hooks
>> > > > > > > into
>> > > > > > > > > > httpmethod.abort() . That would enable client to
>> disconnect
>> > > > early
>> > > > > > and
>> > > > > > > > > > probably that would disconnect the underlying socket so
>> > there
>> > > > > would
>> > > > > > > be
>> > > > > > > > no
>> > > > > > > > > > leaks.
>> > > > > > > > > >
>> > > > > > > > > > Thanks,
>> > > > > > > > > > Susmit
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > On Sat, May 13, 2017 at 7:42 AM, Joel Bernstein <
>> > > > > > joels...@gmail.com>
>> > > > > > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > > If the client closes the connection to the export
>> handler
>> > > > then
>> > > > > > this
>> > > > > > > > > > > exception will occur automatically on the server.
>> > > > > > > > > > >
>> > > > > > > > > > > Joel Bernstein
>> > > > > > > > > > > http://joelsolr.blogspot.com/
>> > > > > > > > > > >
>> > > > > > > > > > > On Sat, May 13, 2017 at 1:46 AM, Susmit Shukla <
>> > > > > > > > > shukla.sus...@gmail.com>
>> > > > > > > > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > > Hi Joel,
>> > > > > > > > > > > >
>> > > > > > > > > > > > Thanks for the insight. How can this exception be
>> > > > > thrown/forced
>> > > > > > > > from
>> > > > > > > > > > > client
>> > > > > > > > > > > > side. Client can't do a System.exit() as it is
>> running
>> > > as a
>> > > > > > > webapp.
>> > > > > > > > > > > >
>> > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > Susmit
>> > > > > > > > > > > >
>> > > > > > > > > > > > On Fri, May 12, 2017 at 4:44 PM, Joel Bernstein <
>> > > > > > > > joels...@gmail.com>
>> > > > > > > > > > > > wrote:
>> > > > > > > > > > > >
>> > > > > > > > > > > > > In this scenario the /export handler continues to
>> > > export
>> > > > > > > results
>> > > > > > > > > > until
>> > > > > > > > > > > it
>> > > > > > > > > > > > > encounters a "Broken Pipe" exception. This
>> exception
>> > is
>> > > > > > trapped
>> > > > > > > > and
>> > > > > > > > > > > > ignored
>> > > > > > > > > > > > > rather then logged as it's not considered an
>> > exception
>> > > if
>> > > > > the
>> > > > > > > > > client
>> > > > > > > > > > > > > disconnects early.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Joel Bernstein
>> > > > > > > > > > > > > http://joelsolr.blogspot.com/
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > On Fri, May 12, 2017 at 2:10 PM, Susmit Shukla <
>> > > > > > > > > > > shukla.sus...@gmail.com>
>> > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > > Hi,
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > I have a question regarding solr /export
>> handler.
>> > > Here
>> > > > is
>> > > > > > the
>> > > > > > > > > > > scenario
>> > > > > > > > > > > > -
>> > > > > > > > > > > > > > I want to use the /export handler - I only need
>> > > sorted
>> > > > > data
>> > > > > > > and
>> > > > > > > > > > this
>> > > > > > > > > > > is
>> > > > > > > > > > > > > the
>> > > > > > > > > > > > > > fastest way to get it. I am doing multiple level
>> > > joins
>> > > > > > using
>> > > > > > > > > > streams
>> > > > > > > > > > > > > using
>> > > > > > > > > > > > > > /export handler. I know the number of top level
>> > > records
>> > > > > to
>> > > > > > be
>> > > > > > > > > > > retrieved
>> > > > > > > > > > > > > but
>> > > > > > > > > > > > > > not for each individual stream rolling up to the
>> > > final
>> > > > > > > result.
>> > > > > > > > > > > > > > I observed that calling close() on a /export
>> stream
>> > > is
>> > > > > too
>> > > > > > > > > > expensive.
>> > > > > > > > > > > > It
>> > > > > > > > > > > > > > reads the stream to the very end of hits.
>> Assuming
>> > > > there
>> > > > > > are
>> > > > > > > > 100
>> > > > > > > > > > > > million
>> > > > > > > > > > > > > > hits for each stream ,first 1k records were
>> found
>> > > after
>> > > > > > joins
>> > > > > > > > and
>> > > > > > > > > > we
>> > > > > > > > > > > > call
>> > > > > > > > > > > > > > close() after that, it would take many
>> > minutes/hours
>> > > to
>> > > > > > > finish
>> > > > > > > > > it.
>> > > > > > > > > > > > > > Currently I have put close() call in a different
>> > > > thread -
>> > > > > > > > > basically
>> > > > > > > > > > > > fire
>> > > > > > > > > > > > > > and forget. But the cluster is very strained
>> > because
>> > > of
>> > > > > the
>> > > > > > > > > > > > unneccessary
>> > > > > > > > > > > > > > reads.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Internally streaming uses ChunkedInputStream of
>> > > > > HttpClient
>> > > > > > > and
>> > > > > > > > it
>> > > > > > > > > > has
>> > > > > > > > > > > > to
>> > > > > > > > > > > > > be
>> > > > > > > > > > > > > > drained in the close() call. But from server
>> point
>> > of
>> > > > > view,
>> > > > > > > it
>> > > > > > > > > > should
>> > > > > > > > > > > > > stop
>> > > > > > > > > > > > > > sending more data once close() has been issued.
>> > > > > > > > > > > > > > There is a read() call in close() method of
>> > > > > > > ChunkedInputStream
>> > > > > > > > > that
>> > > > > > > > > > > is
>> > > > > > > > > > > > > > indistinguishable from real read(). If /export
>> > > handler
>> > > > > > stops
>> > > > > > > > > > sending
>> > > > > > > > > > > > more
>> > > > > > > > > > > > > > data after close it would be very useful.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Another option would be to use /select handler
>> and
>> > > get
>> > > > > into
>> > > > > > > > > > business
>> > > > > > > > > > > of
>> > > > > > > > > > > > > > managing a custom cursor mark that is based on
>> the
>> > > > stream
>> > > > > > > sort
>> > > > > > > > > and
>> > > > > > > > > > is
>> > > > > > > > > > > > > reset
>> > > > > > > > > > > > > > until it fetches the required records at topmost
>> > > level.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Any thoughts.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > > > Susmit
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to