Hi Joel,Susmit I created https://issues.apache.org/jira/browse/SOLR-10698 to track the issue
@Susmit looking at the stack trace I see the expression is using JSONTupleStream . I wonder if you tried using JavabinTupleStreamParser could it help improve performance ? On Tue, May 16, 2017 at 9:39 AM, Susmit Shukla <shukla.sus...@gmail.com> wrote: > Hi Joel, > > queries can be arbitrarily nested with AND/OR/NOT joins e.g. > > (intersect(intersect(search, search), union(search, search))). If I cut off > the innermost stream with a limit, the complete intersection would not > happen at upper levels. Also would the limit stream have same effect as > using /select handler with rows parameter? > I am trying to force input stream close through reflection, just to see if > it gives performance gains. > > 2) would experiment with null streams. Is workers = number of replicas in > data collection a good thumb rule? is parallelstream performance upper > bounded by number of replicas? > > Thanks, > Susmit > > On Tue, May 16, 2017 at 5:59 AM, Joel Bernstein <joels...@gmail.com> > wrote: > > > Your approach looks OK. The single sharded worker collection is only > needed > > if you were using CloudSolrStream to send the initial Streaming > Expression > > to the /stream handler. You are not doing this, so you're approach is > fine. > > > > Here are some thoughts on what you described: > > > > 1) If you are closing the parallel stream after the top 1000 results, > then > > try wrapping the intersect in a LimitStream. This stream doesn't exist > yet > > so it will be a custom stream. The LimitStream can return the EOF tuple > > after it reads N tuples. This will cause the worker nodes to close the > > underlying stream and cause the Broken Pipe exception to occur at the > > /export handler, which will stop the /export. > > > > Here is the basic approach: > > > > parallel(limit(intersect(search, search))) > > > > > > 2) It can be tricky to understand where the bottleneck lies when using > the > > ParallelStream for parallel relational algebra. You can use the > NullStream > > to get an understanding of why performance is not increasing when you > > increase the workers. Here is the basic approach: > > > > parallel(null(intersect(search, search))) > > > > The NullStream will eat all the tuples on the workers and return a single > > tuple with the tuple count and the time taken to run the expression. So > > you'll get one tuple from each worker. This will eliminate any bottleneck > > on tuples returning through the ParallelStream and you can focus on the > > performance of the intersect and the /export handler. > > > > Then experiment with: > > > > 1) Increasing the number of parallel workers. > > 2) Increasing the number of replicas in the data collections. > > > > And watch the timing information coming back from the NullStream tuples. > If > > increasing the workers is not improving performance then the bottleneck > may > > be in the /export handler. So try increasing replicas and see if that > > improves performance. Different partitions of the streams will be served > by > > different replicas. > > > > If performance doesn't improve with the NullStream after increasing both > > workers and replicas then we know the bottleneck is the network. > > > > Joel Bernstein > > http://joelsolr.blogspot.com/ > > > > On Mon, May 15, 2017 at 10:37 PM, Susmit Shukla <shukla.sus...@gmail.com > > > > wrote: > > > > > Hi Joel, > > > > > > Regarding the implementation, I am wrapping the topmost TupleStream in > a > > > ParallelStream and execute it on the worker cluster (one of the joined > > > cluster doubles up as worker cluster). ParallelStream does submit the > > query > > > to /stream handler. > > > for #2, for e.g. I am creating 2 CloudSolrStreams , wrapping them in > > > IntersectStream and wrapping that in ParallelStream and reading out the > > > tuples from parallel stream. close() is called on parallelStream. I do > > have > > > custom streams but that is similar to intersectStream. > > > I am on solr 6.3.1 > > > The 2 solr clusters serving the join queries are having many shards. > > Worker > > > collection is also multi sharded and is one from the main clusters, so > do > > > you imply I should be using a single sharded "worker" collection? Would > > the > > > joins execute faster? > > > On a side note, increasing the workers beyond 1 was not improving the > > > execution times but was degrading if number was 3 and above. That is > > > counter intuitive since the joins are huge and putting more workers > > should > > > have improved the performance. > > > > > > Thanks, > > > Susmit > > > > > > > > > On Mon, May 15, 2017 at 6:47 AM, Joel Bernstein <joels...@gmail.com> > > > wrote: > > > > > > > Ok please do report any issues you run into. This is quite a good bug > > > > report. > > > > > > > > I reviewed the code and I believe I see the problem. The problem > seems > > to > > > > be that output code from the /stream handler is not properly > accounting > > > for > > > > client disconnects and closing the underlying stream. What I see in > the > > > > code is that exceptions coming from read() in the stream do > > automatically > > > > close the underlying stream. But exceptions from the writing of the > > > stream > > > > do not close the stream. This needs to be fixed. > > > > > > > > A few questions about your streaming implementation: > > > > > > > > 1) Are you sending requests to the /stream handler? Or are you > > embedding > > > > CloudSolrStream in your application and bypassing the /stream > handler? > > > > > > > > 2) If you're sending Streaming Expressions to the stream handler are > > you > > > > using SolrStream or CloudSolrStream to send the expression? > > > > > > > > 3) What version of Solr are you using. > > > > > > > > 4) Have you implemented any custom streams? > > > > > > > > > > > > #2 is an important question. If you're sending expressions to the > > /stream > > > > handler using CloudSolrStream the collection running the expression > > would > > > > have to be setup a specific way. The collection running the > expression > > > will > > > > have to be a* single shard collection*. You can have as many replicas > > as > > > > you want but only one shard. That's because CloudSolrStream picks one > > > > replica in each shard to forward the request to then merges the > results > > > > from the shards. So if you send in an expression using > CloudSolrStream > > > that > > > > expression will be sent to each shard to be run and each shard will > be > > > > duplicating the work and return duplicate results. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Joel Bernstein > > > > http://joelsolr.blogspot.com/ > > > > > > > > On Sat, May 13, 2017 at 7:03 PM, Susmit Shukla < > > shukla.sus...@gmail.com> > > > > wrote: > > > > > > > > > Thanks Joel > > > > > Streaming is awesome, just had a huge implementation in my > project. I > > > > found > > > > > out a couple more issues with streaming and did local hacks for > them, > > > > would > > > > > raise them too. > > > > > > > > > > On Sat, May 13, 2017 at 2:09 PM, Joel Bernstein < > joels...@gmail.com> > > > > > wrote: > > > > > > > > > > > Ah, then this is unexpected behavior. Can you open a ticket for > > this? > > > > > > > > > > > > Joel Bernstein > > > > > > http://joelsolr.blogspot.com/ > > > > > > > > > > > > On Sat, May 13, 2017 at 2:51 PM, Susmit Shukla < > > > > shukla.sus...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Hi Joel, > > > > > > > > > > > > > > I was using CloudSolrStream for the above test. Below is the > call > > > > > stack. > > > > > > > > > > > > > > at > > > > > > > org.apache.http.impl.io.ChunkedInputStream.read( > > > > > > > ChunkedInputStream.java:215) > > > > > > > at > > > > > > > org.apache.http.impl.io.ChunkedInputStream.close( > > > > > > > ChunkedInputStream.java:316) > > > > > > > at > > > > > > > org.apache.http.impl.execchain.ResponseEntityProxy. > streamClosed( > > > > > > > ResponseEntityProxy.java:128) > > > > > > > at > > > > > > > org.apache.http.conn.EofSensorInputStream.checkClose( > > > > > > > EofSensorInputStream.java:228) > > > > > > > at > > > > > > > org.apache.http.conn.EofSensorInputStream.close( > > > > > > > EofSensorInputStream.java:174) > > > > > > > at sun.nio.cs.StreamDecoder.implClose(StreamDecoder.java:378) > > > > > > > at sun.nio.cs.StreamDecoder.close(StreamDecoder.java:193) > > > > > > > at java.io.InputStreamReader.close(InputStreamReader.java:199) > > > > > > > at > > > > > > > org.apache.solr.client.solrj.io.stream.JSONTupleStream. > > > > > > > close(JSONTupleStream.java:91) > > > > > > > at > > > > > > > org.apache.solr.client.solrj.io.stream.SolrStream.close( > > > > > > > SolrStream.java:186) > > > > > > > > > > > > > > Thanks, > > > > > > > Susmit > > > > > > > > > > > > > > On Sat, May 13, 2017 at 10:48 AM, Joel Bernstein < > > > joels...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > I was just reading the Java docs on the ChunkedInputStream. > > > > > > > > > > > > > > > > "Note that this class NEVER closes the underlying stream" > > > > > > > > > > > > > > > > In that scenario the /export would indeed continue to send > > data. > > > I > > > > > > think > > > > > > > we > > > > > > > > can consider this an anti-pattern for the /export handler > > > > currently. > > > > > > > > > > > > > > > > I would suggest using one of the Streaming Clients to connect > > to > > > > the > > > > > > > export > > > > > > > > handler. Either CloudSolrStream or SolrStream will both > > interact > > > > with > > > > > > the > > > > > > > > /export handler in a the way that it expects. > > > > > > > > > > > > > > > > > > > > > > > > Joel Bernstein > > > > > > > > http://joelsolr.blogspot.com/ > > > > > > > > > > > > > > > > On Sat, May 13, 2017 at 12:28 PM, Susmit Shukla < > > > > > > shukla.sus...@gmail.com > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Joel, > > > > > > > > > > > > > > > > > > I did not observe that. On calling close() on stream, it > > cycled > > > > > > through > > > > > > > > all > > > > > > > > > the hits that /export handler calculated. > > > > > > > > > e.g. with a *:* query and export handler on a 100k document > > > > index, > > > > > I > > > > > > > > could > > > > > > > > > see the 100kth record printed on the http wire debug log > > > although > > > > > > close > > > > > > > > was > > > > > > > > > called after reading 1st tuple. The time taken for the > > > operation > > > > > with > > > > > > > > > close() call was same as that if I had read all the 100k > > > tuples. > > > > > > > > > As I have pointed out, close() on underlying > > ChunkedInputStream > > > > > calls > > > > > > > > > read() and solr server has probably no way to distinguish > it > > > from > > > > > > > read() > > > > > > > > > happening from regular tuple reads.. > > > > > > > > > I think there should be an abort() API for solr streams > that > > > > hooks > > > > > > into > > > > > > > > > httpmethod.abort() . That would enable client to disconnect > > > early > > > > > and > > > > > > > > > probably that would disconnect the underlying socket so > there > > > > would > > > > > > be > > > > > > > no > > > > > > > > > leaks. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Susmit > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sat, May 13, 2017 at 7:42 AM, Joel Bernstein < > > > > > joels...@gmail.com> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > If the client closes the connection to the export handler > > > then > > > > > this > > > > > > > > > > exception will occur automatically on the server. > > > > > > > > > > > > > > > > > > > > Joel Bernstein > > > > > > > > > > http://joelsolr.blogspot.com/ > > > > > > > > > > > > > > > > > > > > On Sat, May 13, 2017 at 1:46 AM, Susmit Shukla < > > > > > > > > shukla.sus...@gmail.com> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Joel, > > > > > > > > > > > > > > > > > > > > > > Thanks for the insight. How can this exception be > > > > thrown/forced > > > > > > > from > > > > > > > > > > client > > > > > > > > > > > side. Client can't do a System.exit() as it is running > > as a > > > > > > webapp. > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > Susmit > > > > > > > > > > > > > > > > > > > > > > On Fri, May 12, 2017 at 4:44 PM, Joel Bernstein < > > > > > > > joels...@gmail.com> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > In this scenario the /export handler continues to > > export > > > > > > results > > > > > > > > > until > > > > > > > > > > it > > > > > > > > > > > > encounters a "Broken Pipe" exception. This exception > is > > > > > trapped > > > > > > > and > > > > > > > > > > > ignored > > > > > > > > > > > > rather then logged as it's not considered an > exception > > if > > > > the > > > > > > > > client > > > > > > > > > > > > disconnects early. > > > > > > > > > > > > > > > > > > > > > > > > Joel Bernstein > > > > > > > > > > > > http://joelsolr.blogspot.com/ > > > > > > > > > > > > > > > > > > > > > > > > On Fri, May 12, 2017 at 2:10 PM, Susmit Shukla < > > > > > > > > > > shukla.sus...@gmail.com> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > > > > > > > > > I have a question regarding solr /export handler. > > Here > > > is > > > > > the > > > > > > > > > > scenario > > > > > > > > > > > - > > > > > > > > > > > > > I want to use the /export handler - I only need > > sorted > > > > data > > > > > > and > > > > > > > > > this > > > > > > > > > > is > > > > > > > > > > > > the > > > > > > > > > > > > > fastest way to get it. I am doing multiple level > > joins > > > > > using > > > > > > > > > streams > > > > > > > > > > > > using > > > > > > > > > > > > > /export handler. I know the number of top level > > records > > > > to > > > > > be > > > > > > > > > > retrieved > > > > > > > > > > > > but > > > > > > > > > > > > > not for each individual stream rolling up to the > > final > > > > > > result. > > > > > > > > > > > > > I observed that calling close() on a /export stream > > is > > > > too > > > > > > > > > expensive. > > > > > > > > > > > It > > > > > > > > > > > > > reads the stream to the very end of hits. Assuming > > > there > > > > > are > > > > > > > 100 > > > > > > > > > > > million > > > > > > > > > > > > > hits for each stream ,first 1k records were found > > after > > > > > joins > > > > > > > and > > > > > > > > > we > > > > > > > > > > > call > > > > > > > > > > > > > close() after that, it would take many > minutes/hours > > to > > > > > > finish > > > > > > > > it. > > > > > > > > > > > > > Currently I have put close() call in a different > > > thread - > > > > > > > > basically > > > > > > > > > > > fire > > > > > > > > > > > > > and forget. But the cluster is very strained > because > > of > > > > the > > > > > > > > > > > unneccessary > > > > > > > > > > > > > reads. > > > > > > > > > > > > > > > > > > > > > > > > > > Internally streaming uses ChunkedInputStream of > > > > HttpClient > > > > > > and > > > > > > > it > > > > > > > > > has > > > > > > > > > > > to > > > > > > > > > > > > be > > > > > > > > > > > > > drained in the close() call. But from server point > of > > > > view, > > > > > > it > > > > > > > > > should > > > > > > > > > > > > stop > > > > > > > > > > > > > sending more data once close() has been issued. > > > > > > > > > > > > > There is a read() call in close() method of > > > > > > ChunkedInputStream > > > > > > > > that > > > > > > > > > > is > > > > > > > > > > > > > indistinguishable from real read(). If /export > > handler > > > > > stops > > > > > > > > > sending > > > > > > > > > > > more > > > > > > > > > > > > > data after close it would be very useful. > > > > > > > > > > > > > > > > > > > > > > > > > > Another option would be to use /select handler and > > get > > > > into > > > > > > > > > business > > > > > > > > > > of > > > > > > > > > > > > > managing a custom cursor mark that is based on the > > > stream > > > > > > sort > > > > > > > > and > > > > > > > > > is > > > > > > > > > > > > reset > > > > > > > > > > > > > until it fetches the required records at topmost > > level. > > > > > > > > > > > > > > > > > > > > > > > > > > Any thoughts. > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > Susmit > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >