Hi Joel,Susmit

I created https://issues.apache.org/jira/browse/SOLR-10698 to track the
issue

@Susmit looking at the stack trace I see the expression is using
JSONTupleStream
. I wonder if you tried using JavabinTupleStreamParser could it help
improve performance ?

On Tue, May 16, 2017 at 9:39 AM, Susmit Shukla <shukla.sus...@gmail.com>
wrote:

> Hi Joel,
>
> queries can be arbitrarily nested with AND/OR/NOT joins e.g.
>
> (intersect(intersect(search, search), union(search, search))). If I cut off
> the innermost stream with a limit, the complete intersection would not
> happen at upper levels. Also would the limit stream have same effect as
> using /select handler with rows parameter?
> I am trying to force input stream close through reflection, just to see if
> it gives performance gains.
>
> 2) would experiment with null streams. Is workers = number of replicas in
> data collection a good thumb rule? is parallelstream performance upper
> bounded by number of replicas?
>
> Thanks,
> Susmit
>
> On Tue, May 16, 2017 at 5:59 AM, Joel Bernstein <joels...@gmail.com>
> wrote:
>
> > Your approach looks OK. The single sharded worker collection is only
> needed
> > if you were using CloudSolrStream to send the initial Streaming
> Expression
> > to the /stream handler. You are not doing this, so you're approach is
> fine.
> >
> > Here are some thoughts on what you described:
> >
> > 1) If you are closing the parallel stream after the top 1000 results,
> then
> > try wrapping the intersect in a LimitStream. This stream doesn't exist
> yet
> > so it will be a custom stream. The LimitStream can return the EOF tuple
> > after it reads N tuples. This will cause the worker nodes to close the
> > underlying stream and cause the Broken Pipe exception to occur at the
> > /export handler, which will stop the /export.
> >
> > Here is the basic approach:
> >
> > parallel(limit(intersect(search, search)))
> >
> >
> > 2) It can be tricky to understand where the bottleneck lies when using
> the
> > ParallelStream for parallel relational algebra. You can use the
> NullStream
> > to get an understanding of why performance is not increasing when you
> > increase the workers. Here is the basic approach:
> >
> > parallel(null(intersect(search, search)))
> >
> > The NullStream will eat all the tuples on the workers and return a single
> > tuple with the tuple count and the time taken to run the expression. So
> > you'll get one tuple from each worker. This will eliminate any bottleneck
> > on tuples returning through the ParallelStream and you can focus on the
> > performance of the intersect and the /export handler.
> >
> > Then experiment with:
> >
> > 1) Increasing the number of parallel workers.
> > 2) Increasing the number of replicas in the data collections.
> >
> > And watch the timing information coming back from the NullStream tuples.
> If
> > increasing the workers is not improving performance then the bottleneck
> may
> > be in the /export handler. So try increasing replicas and see if that
> > improves performance. Different partitions of the streams will be served
> by
> > different replicas.
> >
> > If performance doesn't improve with the NullStream after increasing both
> > workers and replicas then we know the bottleneck is the network.
> >
> > Joel Bernstein
> > http://joelsolr.blogspot.com/
> >
> > On Mon, May 15, 2017 at 10:37 PM, Susmit Shukla <shukla.sus...@gmail.com
> >
> > wrote:
> >
> > > Hi Joel,
> > >
> > > Regarding the implementation, I am wrapping the topmost TupleStream in
> a
> > > ParallelStream and execute it on the worker cluster (one of the joined
> > > cluster doubles up as worker cluster). ParallelStream does submit the
> > query
> > > to /stream handler.
> > > for #2, for e.g. I am creating 2 CloudSolrStreams , wrapping them in
> > > IntersectStream and wrapping that in ParallelStream and reading out the
> > > tuples from parallel stream. close() is called on parallelStream. I do
> > have
> > > custom streams but that is similar to intersectStream.
> > > I am on solr 6.3.1
> > > The 2 solr clusters serving the join queries are having many shards.
> > Worker
> > > collection is also multi sharded and is one from the main clusters, so
> do
> > > you imply I should be using a single sharded "worker" collection? Would
> > the
> > > joins execute faster?
> > > On a side note, increasing the workers beyond 1 was not improving the
> > > execution times but was degrading if number was 3 and above. That is
> > > counter intuitive since the joins are huge and putting more workers
> > should
> > > have improved the performance.
> > >
> > > Thanks,
> > > Susmit
> > >
> > >
> > > On Mon, May 15, 2017 at 6:47 AM, Joel Bernstein <joels...@gmail.com>
> > > wrote:
> > >
> > > > Ok please do report any issues you run into. This is quite a good bug
> > > > report.
> > > >
> > > > I reviewed the code and I believe I see the problem. The problem
> seems
> > to
> > > > be that output code from the /stream handler is not properly
> accounting
> > > for
> > > > client disconnects and closing the underlying stream. What I see in
> the
> > > > code is that exceptions coming from read() in the stream do
> > automatically
> > > > close the underlying stream. But exceptions from the writing of the
> > > stream
> > > > do not close the stream. This needs to be fixed.
> > > >
> > > > A few questions about your streaming implementation:
> > > >
> > > > 1) Are you sending requests to the /stream handler? Or are you
> > embedding
> > > > CloudSolrStream in your application and bypassing the /stream
> handler?
> > > >
> > > > 2) If you're sending Streaming Expressions to the stream handler are
> > you
> > > > using SolrStream or CloudSolrStream to send the expression?
> > > >
> > > > 3) What version of Solr are you using.
> > > >
> > > > 4) Have you implemented any custom streams?
> > > >
> > > >
> > > > #2 is an important question. If you're sending expressions to the
> > /stream
> > > > handler using CloudSolrStream the collection running the expression
> > would
> > > > have to be setup a specific way. The collection running the
> expression
> > > will
> > > > have to be a* single shard collection*. You can have as many replicas
> > as
> > > > you want but only one shard. That's because CloudSolrStream picks one
> > > > replica in each shard to forward the request to then merges the
> results
> > > > from the shards. So if you send in an expression using
> CloudSolrStream
> > > that
> > > > expression will be sent to each shard to be run and each shard will
> be
> > > > duplicating the work and return duplicate results.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > Joel Bernstein
> > > > http://joelsolr.blogspot.com/
> > > >
> > > > On Sat, May 13, 2017 at 7:03 PM, Susmit Shukla <
> > shukla.sus...@gmail.com>
> > > > wrote:
> > > >
> > > > > Thanks Joel
> > > > > Streaming is awesome, just had a huge implementation in my
> project. I
> > > > found
> > > > > out a couple more issues with streaming and did local hacks for
> them,
> > > > would
> > > > > raise them too.
> > > > >
> > > > > On Sat, May 13, 2017 at 2:09 PM, Joel Bernstein <
> joels...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Ah, then this is unexpected behavior. Can you open a ticket for
> > this?
> > > > > >
> > > > > > Joel Bernstein
> > > > > > http://joelsolr.blogspot.com/
> > > > > >
> > > > > > On Sat, May 13, 2017 at 2:51 PM, Susmit Shukla <
> > > > shukla.sus...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Joel,
> > > > > > >
> > > > > > > I was using CloudSolrStream for the above test. Below is the
> call
> > > > > stack.
> > > > > > >
> > > > > > > at
> > > > > > > org.apache.http.impl.io.ChunkedInputStream.read(
> > > > > > > ChunkedInputStream.java:215)
> > > > > > > at
> > > > > > > org.apache.http.impl.io.ChunkedInputStream.close(
> > > > > > > ChunkedInputStream.java:316)
> > > > > > > at
> > > > > > > org.apache.http.impl.execchain.ResponseEntityProxy.
> streamClosed(
> > > > > > > ResponseEntityProxy.java:128)
> > > > > > > at
> > > > > > > org.apache.http.conn.EofSensorInputStream.checkClose(
> > > > > > > EofSensorInputStream.java:228)
> > > > > > > at
> > > > > > > org.apache.http.conn.EofSensorInputStream.close(
> > > > > > > EofSensorInputStream.java:174)
> > > > > > > at sun.nio.cs.StreamDecoder.implClose(StreamDecoder.java:378)
> > > > > > > at sun.nio.cs.StreamDecoder.close(StreamDecoder.java:193)
> > > > > > > at java.io.InputStreamReader.close(InputStreamReader.java:199)
> > > > > > > at
> > > > > > > org.apache.solr.client.solrj.io.stream.JSONTupleStream.
> > > > > > > close(JSONTupleStream.java:91)
> > > > > > > at
> > > > > > > org.apache.solr.client.solrj.io.stream.SolrStream.close(
> > > > > > > SolrStream.java:186)
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Susmit
> > > > > > >
> > > > > > > On Sat, May 13, 2017 at 10:48 AM, Joel Bernstein <
> > > joels...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > I was just reading the Java docs on the ChunkedInputStream.
> > > > > > > >
> > > > > > > > "Note that this class NEVER closes the underlying stream"
> > > > > > > >
> > > > > > > > In that scenario the /export would indeed continue to send
> > data.
> > > I
> > > > > > think
> > > > > > > we
> > > > > > > > can consider this an anti-pattern for the /export handler
> > > > currently.
> > > > > > > >
> > > > > > > > I would suggest using one of the Streaming Clients to connect
> > to
> > > > the
> > > > > > > export
> > > > > > > > handler. Either CloudSolrStream or SolrStream will both
> > interact
> > > > with
> > > > > > the
> > > > > > > > /export handler in a the way that it expects.
> > > > > > > >
> > > > > > > >
> > > > > > > > Joel Bernstein
> > > > > > > > http://joelsolr.blogspot.com/
> > > > > > > >
> > > > > > > > On Sat, May 13, 2017 at 12:28 PM, Susmit Shukla <
> > > > > > shukla.sus...@gmail.com
> > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Joel,
> > > > > > > > >
> > > > > > > > > I did not observe that. On calling close() on stream, it
> > cycled
> > > > > > through
> > > > > > > > all
> > > > > > > > > the hits that /export handler calculated.
> > > > > > > > > e.g. with a *:* query and export handler on a 100k document
> > > > index,
> > > > > I
> > > > > > > > could
> > > > > > > > > see the 100kth record printed on the http wire debug log
> > > although
> > > > > > close
> > > > > > > > was
> > > > > > > > > called after reading 1st tuple. The time taken for the
> > > operation
> > > > > with
> > > > > > > > > close() call was same as that if I had read all the 100k
> > > tuples.
> > > > > > > > > As I have pointed out, close() on underlying
> > ChunkedInputStream
> > > > > calls
> > > > > > > > > read() and solr server has probably no way to distinguish
> it
> > > from
> > > > > > > read()
> > > > > > > > > happening from regular tuple reads..
> > > > > > > > > I think there should be an abort() API for solr streams
> that
> > > > hooks
> > > > > > into
> > > > > > > > > httpmethod.abort() . That would enable client to disconnect
> > > early
> > > > > and
> > > > > > > > > probably that would disconnect the underlying socket so
> there
> > > > would
> > > > > > be
> > > > > > > no
> > > > > > > > > leaks.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Susmit
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Sat, May 13, 2017 at 7:42 AM, Joel Bernstein <
> > > > > joels...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > If the client closes the connection to the export handler
> > > then
> > > > > this
> > > > > > > > > > exception will occur automatically on the server.
> > > > > > > > > >
> > > > > > > > > > Joel Bernstein
> > > > > > > > > > http://joelsolr.blogspot.com/
> > > > > > > > > >
> > > > > > > > > > On Sat, May 13, 2017 at 1:46 AM, Susmit Shukla <
> > > > > > > > shukla.sus...@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Joel,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for the insight. How can this exception be
> > > > thrown/forced
> > > > > > > from
> > > > > > > > > > client
> > > > > > > > > > > side. Client can't do a System.exit() as it is running
> > as a
> > > > > > webapp.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Susmit
> > > > > > > > > > >
> > > > > > > > > > > On Fri, May 12, 2017 at 4:44 PM, Joel Bernstein <
> > > > > > > joels...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > In this scenario the /export handler continues to
> > export
> > > > > > results
> > > > > > > > > until
> > > > > > > > > > it
> > > > > > > > > > > > encounters a "Broken Pipe" exception. This exception
> is
> > > > > trapped
> > > > > > > and
> > > > > > > > > > > ignored
> > > > > > > > > > > > rather then logged as it's not considered an
> exception
> > if
> > > > the
> > > > > > > > client
> > > > > > > > > > > > disconnects early.
> > > > > > > > > > > >
> > > > > > > > > > > > Joel Bernstein
> > > > > > > > > > > > http://joelsolr.blogspot.com/
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, May 12, 2017 at 2:10 PM, Susmit Shukla <
> > > > > > > > > > shukla.sus...@gmail.com>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I have a question regarding solr /export handler.
> > Here
> > > is
> > > > > the
> > > > > > > > > > scenario
> > > > > > > > > > > -
> > > > > > > > > > > > > I want to use the /export handler - I only need
> > sorted
> > > > data
> > > > > > and
> > > > > > > > > this
> > > > > > > > > > is
> > > > > > > > > > > > the
> > > > > > > > > > > > > fastest way to get it. I am doing multiple level
> > joins
> > > > > using
> > > > > > > > > streams
> > > > > > > > > > > > using
> > > > > > > > > > > > > /export handler. I know the number of top level
> > records
> > > > to
> > > > > be
> > > > > > > > > > retrieved
> > > > > > > > > > > > but
> > > > > > > > > > > > > not for each individual stream rolling up to the
> > final
> > > > > > result.
> > > > > > > > > > > > > I observed that calling close() on a /export stream
> > is
> > > > too
> > > > > > > > > expensive.
> > > > > > > > > > > It
> > > > > > > > > > > > > reads the stream to the very end of hits. Assuming
> > > there
> > > > > are
> > > > > > > 100
> > > > > > > > > > > million
> > > > > > > > > > > > > hits for each stream ,first 1k records were found
> > after
> > > > > joins
> > > > > > > and
> > > > > > > > > we
> > > > > > > > > > > call
> > > > > > > > > > > > > close() after that, it would take many
> minutes/hours
> > to
> > > > > > finish
> > > > > > > > it.
> > > > > > > > > > > > > Currently I have put close() call in a different
> > > thread -
> > > > > > > > basically
> > > > > > > > > > > fire
> > > > > > > > > > > > > and forget. But the cluster is very strained
> because
> > of
> > > > the
> > > > > > > > > > > unneccessary
> > > > > > > > > > > > > reads.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Internally streaming uses ChunkedInputStream of
> > > > HttpClient
> > > > > > and
> > > > > > > it
> > > > > > > > > has
> > > > > > > > > > > to
> > > > > > > > > > > > be
> > > > > > > > > > > > > drained in the close() call. But from server point
> of
> > > > view,
> > > > > > it
> > > > > > > > > should
> > > > > > > > > > > > stop
> > > > > > > > > > > > > sending more data once close() has been issued.
> > > > > > > > > > > > > There is a read() call in close() method of
> > > > > > ChunkedInputStream
> > > > > > > > that
> > > > > > > > > > is
> > > > > > > > > > > > > indistinguishable from real read(). If /export
> > handler
> > > > > stops
> > > > > > > > > sending
> > > > > > > > > > > more
> > > > > > > > > > > > > data after close it would be very useful.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Another option would be to use /select handler and
> > get
> > > > into
> > > > > > > > > business
> > > > > > > > > > of
> > > > > > > > > > > > > managing a custom cursor mark that is based on the
> > > stream
> > > > > > sort
> > > > > > > > and
> > > > > > > > > is
> > > > > > > > > > > > reset
> > > > > > > > > > > > > until it fetches the required records at topmost
> > level.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Any thoughts.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Susmit
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to