Your approach looks OK. The single sharded worker collection is only needed
if you were using CloudSolrStream to send the initial Streaming Expression
to the /stream handler. You are not doing this, so you're approach is fine.

Here are some thoughts on what you described:

1) If you are closing the parallel stream after the top 1000 results, then
try wrapping the intersect in a LimitStream. This stream doesn't exist yet
so it will be a custom stream. The LimitStream can return the EOF tuple
after it reads N tuples. This will cause the worker nodes to close the
underlying stream and cause the Broken Pipe exception to occur at the
/export handler, which will stop the /export.

Here is the basic approach:

parallel(limit(intersect(search, search)))


2) It can be tricky to understand where the bottleneck lies when using the
ParallelStream for parallel relational algebra. You can use the NullStream
to get an understanding of why performance is not increasing when you
increase the workers. Here is the basic approach:

parallel(null(intersect(search, search)))

The NullStream will eat all the tuples on the workers and return a single
tuple with the tuple count and the time taken to run the expression. So
you'll get one tuple from each worker. This will eliminate any bottleneck
on tuples returning through the ParallelStream and you can focus on the
performance of the intersect and the /export handler.

Then experiment with:

1) Increasing the number of parallel workers.
2) Increasing the number of replicas in the data collections.

And watch the timing information coming back from the NullStream tuples. If
increasing the workers is not improving performance then the bottleneck may
be in the /export handler. So try increasing replicas and see if that
improves performance. Different partitions of the streams will be served by
different replicas.

If performance doesn't improve with the NullStream after increasing both
workers and replicas then we know the bottleneck is the network.

Joel Bernstein
http://joelsolr.blogspot.com/

On Mon, May 15, 2017 at 10:37 PM, Susmit Shukla <shukla.sus...@gmail.com>
wrote:

> Hi Joel,
>
> Regarding the implementation, I am wrapping the topmost TupleStream in a
> ParallelStream and execute it on the worker cluster (one of the joined
> cluster doubles up as worker cluster). ParallelStream does submit the query
> to /stream handler.
> for #2, for e.g. I am creating 2 CloudSolrStreams , wrapping them in
> IntersectStream and wrapping that in ParallelStream and reading out the
> tuples from parallel stream. close() is called on parallelStream. I do have
> custom streams but that is similar to intersectStream.
> I am on solr 6.3.1
> The 2 solr clusters serving the join queries are having many shards. Worker
> collection is also multi sharded and is one from the main clusters, so do
> you imply I should be using a single sharded "worker" collection? Would the
> joins execute faster?
> On a side note, increasing the workers beyond 1 was not improving the
> execution times but was degrading if number was 3 and above. That is
> counter intuitive since the joins are huge and putting more workers should
> have improved the performance.
>
> Thanks,
> Susmit
>
>
> On Mon, May 15, 2017 at 6:47 AM, Joel Bernstein <joels...@gmail.com>
> wrote:
>
> > Ok please do report any issues you run into. This is quite a good bug
> > report.
> >
> > I reviewed the code and I believe I see the problem. The problem seems to
> > be that output code from the /stream handler is not properly accounting
> for
> > client disconnects and closing the underlying stream. What I see in the
> > code is that exceptions coming from read() in the stream do automatically
> > close the underlying stream. But exceptions from the writing of the
> stream
> > do not close the stream. This needs to be fixed.
> >
> > A few questions about your streaming implementation:
> >
> > 1) Are you sending requests to the /stream handler? Or are you embedding
> > CloudSolrStream in your application and bypassing the /stream handler?
> >
> > 2) If you're sending Streaming Expressions to the stream handler are you
> > using SolrStream or CloudSolrStream to send the expression?
> >
> > 3) What version of Solr are you using.
> >
> > 4) Have you implemented any custom streams?
> >
> >
> > #2 is an important question. If you're sending expressions to the /stream
> > handler using CloudSolrStream the collection running the expression would
> > have to be setup a specific way. The collection running the expression
> will
> > have to be a* single shard collection*. You can have as many replicas as
> > you want but only one shard. That's because CloudSolrStream picks one
> > replica in each shard to forward the request to then merges the results
> > from the shards. So if you send in an expression using CloudSolrStream
> that
> > expression will be sent to each shard to be run and each shard will be
> > duplicating the work and return duplicate results.
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > Joel Bernstein
> > http://joelsolr.blogspot.com/
> >
> > On Sat, May 13, 2017 at 7:03 PM, Susmit Shukla <shukla.sus...@gmail.com>
> > wrote:
> >
> > > Thanks Joel
> > > Streaming is awesome, just had a huge implementation in my project. I
> > found
> > > out a couple more issues with streaming and did local hacks for them,
> > would
> > > raise them too.
> > >
> > > On Sat, May 13, 2017 at 2:09 PM, Joel Bernstein <joels...@gmail.com>
> > > wrote:
> > >
> > > > Ah, then this is unexpected behavior. Can you open a ticket for this?
> > > >
> > > > Joel Bernstein
> > > > http://joelsolr.blogspot.com/
> > > >
> > > > On Sat, May 13, 2017 at 2:51 PM, Susmit Shukla <
> > shukla.sus...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Joel,
> > > > >
> > > > > I was using CloudSolrStream for the above test. Below is the call
> > > stack.
> > > > >
> > > > > at
> > > > > org.apache.http.impl.io.ChunkedInputStream.read(
> > > > > ChunkedInputStream.java:215)
> > > > > at
> > > > > org.apache.http.impl.io.ChunkedInputStream.close(
> > > > > ChunkedInputStream.java:316)
> > > > > at
> > > > > org.apache.http.impl.execchain.ResponseEntityProxy.streamClosed(
> > > > > ResponseEntityProxy.java:128)
> > > > > at
> > > > > org.apache.http.conn.EofSensorInputStream.checkClose(
> > > > > EofSensorInputStream.java:228)
> > > > > at
> > > > > org.apache.http.conn.EofSensorInputStream.close(
> > > > > EofSensorInputStream.java:174)
> > > > > at sun.nio.cs.StreamDecoder.implClose(StreamDecoder.java:378)
> > > > > at sun.nio.cs.StreamDecoder.close(StreamDecoder.java:193)
> > > > > at java.io.InputStreamReader.close(InputStreamReader.java:199)
> > > > > at
> > > > > org.apache.solr.client.solrj.io.stream.JSONTupleStream.
> > > > > close(JSONTupleStream.java:91)
> > > > > at
> > > > > org.apache.solr.client.solrj.io.stream.SolrStream.close(
> > > > > SolrStream.java:186)
> > > > >
> > > > > Thanks,
> > > > > Susmit
> > > > >
> > > > > On Sat, May 13, 2017 at 10:48 AM, Joel Bernstein <
> joels...@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > > > I was just reading the Java docs on the ChunkedInputStream.
> > > > > >
> > > > > > "Note that this class NEVER closes the underlying stream"
> > > > > >
> > > > > > In that scenario the /export would indeed continue to send data.
> I
> > > > think
> > > > > we
> > > > > > can consider this an anti-pattern for the /export handler
> > currently.
> > > > > >
> > > > > > I would suggest using one of the Streaming Clients to connect to
> > the
> > > > > export
> > > > > > handler. Either CloudSolrStream or SolrStream will both interact
> > with
> > > > the
> > > > > > /export handler in a the way that it expects.
> > > > > >
> > > > > >
> > > > > > Joel Bernstein
> > > > > > http://joelsolr.blogspot.com/
> > > > > >
> > > > > > On Sat, May 13, 2017 at 12:28 PM, Susmit Shukla <
> > > > shukla.sus...@gmail.com
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Joel,
> > > > > > >
> > > > > > > I did not observe that. On calling close() on stream, it cycled
> > > > through
> > > > > > all
> > > > > > > the hits that /export handler calculated.
> > > > > > > e.g. with a *:* query and export handler on a 100k document
> > index,
> > > I
> > > > > > could
> > > > > > > see the 100kth record printed on the http wire debug log
> although
> > > > close
> > > > > > was
> > > > > > > called after reading 1st tuple. The time taken for the
> operation
> > > with
> > > > > > > close() call was same as that if I had read all the 100k
> tuples.
> > > > > > > As I have pointed out, close() on underlying ChunkedInputStream
> > > calls
> > > > > > > read() and solr server has probably no way to distinguish it
> from
> > > > > read()
> > > > > > > happening from regular tuple reads..
> > > > > > > I think there should be an abort() API for solr streams that
> > hooks
> > > > into
> > > > > > > httpmethod.abort() . That would enable client to disconnect
> early
> > > and
> > > > > > > probably that would disconnect the underlying socket so there
> > would
> > > > be
> > > > > no
> > > > > > > leaks.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Susmit
> > > > > > >
> > > > > > >
> > > > > > > On Sat, May 13, 2017 at 7:42 AM, Joel Bernstein <
> > > joels...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > If the client closes the connection to the export handler
> then
> > > this
> > > > > > > > exception will occur automatically on the server.
> > > > > > > >
> > > > > > > > Joel Bernstein
> > > > > > > > http://joelsolr.blogspot.com/
> > > > > > > >
> > > > > > > > On Sat, May 13, 2017 at 1:46 AM, Susmit Shukla <
> > > > > > shukla.sus...@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Joel,
> > > > > > > > >
> > > > > > > > > Thanks for the insight. How can this exception be
> > thrown/forced
> > > > > from
> > > > > > > > client
> > > > > > > > > side. Client can't do a System.exit() as it is running as a
> > > > webapp.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Susmit
> > > > > > > > >
> > > > > > > > > On Fri, May 12, 2017 at 4:44 PM, Joel Bernstein <
> > > > > joels...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > In this scenario the /export handler continues to export
> > > > results
> > > > > > > until
> > > > > > > > it
> > > > > > > > > > encounters a "Broken Pipe" exception. This exception is
> > > trapped
> > > > > and
> > > > > > > > > ignored
> > > > > > > > > > rather then logged as it's not considered an exception if
> > the
> > > > > > client
> > > > > > > > > > disconnects early.
> > > > > > > > > >
> > > > > > > > > > Joel Bernstein
> > > > > > > > > > http://joelsolr.blogspot.com/
> > > > > > > > > >
> > > > > > > > > > On Fri, May 12, 2017 at 2:10 PM, Susmit Shukla <
> > > > > > > > shukla.sus...@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi,
> > > > > > > > > > >
> > > > > > > > > > > I have a question regarding solr /export handler. Here
> is
> > > the
> > > > > > > > scenario
> > > > > > > > > -
> > > > > > > > > > > I want to use the /export handler - I only need sorted
> > data
> > > > and
> > > > > > > this
> > > > > > > > is
> > > > > > > > > > the
> > > > > > > > > > > fastest way to get it. I am doing multiple level joins
> > > using
> > > > > > > streams
> > > > > > > > > > using
> > > > > > > > > > > /export handler. I know the number of top level records
> > to
> > > be
> > > > > > > > retrieved
> > > > > > > > > > but
> > > > > > > > > > > not for each individual stream rolling up to the final
> > > > result.
> > > > > > > > > > > I observed that calling close() on a /export stream is
> > too
> > > > > > > expensive.
> > > > > > > > > It
> > > > > > > > > > > reads the stream to the very end of hits. Assuming
> there
> > > are
> > > > > 100
> > > > > > > > > million
> > > > > > > > > > > hits for each stream ,first 1k records were found after
> > > joins
> > > > > and
> > > > > > > we
> > > > > > > > > call
> > > > > > > > > > > close() after that, it would take many minutes/hours to
> > > > finish
> > > > > > it.
> > > > > > > > > > > Currently I have put close() call in a different
> thread -
> > > > > > basically
> > > > > > > > > fire
> > > > > > > > > > > and forget. But the cluster is very strained because of
> > the
> > > > > > > > > unneccessary
> > > > > > > > > > > reads.
> > > > > > > > > > >
> > > > > > > > > > > Internally streaming uses ChunkedInputStream of
> > HttpClient
> > > > and
> > > > > it
> > > > > > > has
> > > > > > > > > to
> > > > > > > > > > be
> > > > > > > > > > > drained in the close() call. But from server point of
> > view,
> > > > it
> > > > > > > should
> > > > > > > > > > stop
> > > > > > > > > > > sending more data once close() has been issued.
> > > > > > > > > > > There is a read() call in close() method of
> > > > ChunkedInputStream
> > > > > > that
> > > > > > > > is
> > > > > > > > > > > indistinguishable from real read(). If /export handler
> > > stops
> > > > > > > sending
> > > > > > > > > more
> > > > > > > > > > > data after close it would be very useful.
> > > > > > > > > > >
> > > > > > > > > > > Another option would be to use /select handler and get
> > into
> > > > > > > business
> > > > > > > > of
> > > > > > > > > > > managing a custom cursor mark that is based on the
> stream
> > > > sort
> > > > > > and
> > > > > > > is
> > > > > > > > > > reset
> > > > > > > > > > > until it fetches the required records at topmost level.
> > > > > > > > > > >
> > > > > > > > > > > Any thoughts.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Susmit
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to