Hello,

First of all I've got the same feeling for some time.
I've checked 
TestThreaded<http://svn.apache.org/repos/asf/lucene/dev/tags/lucene_solr_3_4_0/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestThreaded.java>
and
got that it doesn't cover CachedSqlEntityProcessor with fancy where="xid=
x.id" feature.
Pls find the test for it attached:
dih-mt-cached-race-condition-TestThreaded.java.patch (as a patch for
TestThreaded from tags/lucene_solr_3_4_0)

Most times testCached*MultiThread*_FullImport() fails (but really rare it
passed) and testCached*SingleThread*_FullImport() is always green. The
difference is one and two threads.
Pls also find attached:

dih-mt-cached-race-condition-TestThreaded.log contains FINEST messages
including the root cause (my guess):
2011/10/08 0:01:13
org.apache.solr.handler.dataimport.*ThreadedEntityProcessorWrapper
nextRow*
詳細レベル (低): *arow : null*
*
*
(I'm sorry for hieroglyphs, I don't know where they comes from, it should be
just FINEST)
and corrupted documents, where some of child entity fields are missed.

dih-mt-cached-race-condition-TestThreaded.test.out - is the actual test
failure (I'm sorry but it was another run than in .log). The failed assert
is the searching root entities by the children terms.

I'm not sure but creating children entity runners in multiple threads looks
really suspicious for me, I hadn't have a time to look deeper into the code.

PS. I want to contribute two bleeding ideas (and implementations too), which
improves DIH much, but this race condition is a blocker for them.

Regards

On Mon, Oct 3, 2011 at 10:09 PM, Maria Vazquez <maria.vazq...@dexone.com>wrote:

> When I'm debugging, if it is single threaded
> CachedSqlEntityProcessor.getAllNonCachedRows is called only once, all the
> rows cached and next time it requests a row it gets it from the cached
> data.
> In the logs I see the SQL call only once.
>
> If I use multiple threads, it calls
> CachedSqlEntityProcessor.getAllNonCachedRows multiple times, so it creates
> a
> new cache per thread.
>
> The cache should be shared among threads and be safe, right?
>
> Thanks!
> Maria
>
>
>
>
> On 10/1/11 8:00 PM, "pulkitsing...@gmail.com" <pulkitsing...@gmail.com>
> wrote:
>
> > What part of the source code in debug mode behaved in a fashion such as
> to
> > make it seem like it is not thread-safe?
> >
> > If it feels difficult to put into words then you can always make a small
> 5 min
> > screencast to demo the issue and talk about it. I do that for really
> complex
> > stuff with Jing by techsmith (free version).
> >
> > Or you can put up a small and simplified test case together to demo the
> issue
> > and then paste the link for that hosted attachment :)
> >
> > Sent from my iPhone
> >
> > On Sep 30, 2011, at 1:28 PM, Maria Vazquez <maria.vazq...@dexone.com>
> wrote:
> >
> >> Hi,
> >> I¹m using threads with JdbcDataStore and CachedSqlEntityProcessor.
> >> I noticed that if I make it single threaded CachedSqlEntityProcessor
> behaves
> >> as expected (it only queries the db once and caches all the rows). If I
> make
> >> it multi threaded it seems to make multiple db queries and when I debug
> the
> >> source code it looks like it is not thread safe.
> >> Any ideas?
> >> Thanks,
> >> Maria
> >>
>
>


-- 
Sincerely yours
Mikhail (Mike) Khludnev
Developer
Grid Dynamics
tel. 1-415-738-8644
Skype: mkhludnev
<http://www.griddynamics.com>
 <mkhlud...@griddynamics.com>
Index: solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestThreaded.java
===================================================================
--- solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestThreaded.java	(revision 1180097)
+++ solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestThreaded.java	(working copy)
@@ -19,13 +19,16 @@
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.Iterator;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Map;
+import java.util.regex.Pattern;
 
 
 public class TestThreaded extends AbstractDataImportHandlerTestCase {
-  @BeforeClass
+
+@BeforeClass
   public static void beforeClass() throws Exception {
     initCore("dataimport-solrconfig.xml", "dataimport-schema.xml");
   }
@@ -57,6 +60,46 @@
     assertQ(req("desc:hello"), "//*[@numFound='4']");
   }
 
+  @Test
+  public void testCachedSingleThread_FullImport() throws Exception {
+      testCached(dataCachedSingleThConfig);
+  }
+  
+  @Test
+  public void testCachedMultiThread_FullImport() throws Exception {
+      testCached(dataCachedConfig);
+  }
+  
+  @SuppressWarnings("unchecked")
+  public void testCached(String config) throws Exception {
+    List<Map> parentRow = new ArrayList();
+//    parentRow.add(createMap("id", "1"));
+    parentRow.add(createMap("id", "2"));
+    parentRow.add(createMap("id", "3"));
+    parentRow.add(createMap("id", "4"));
+    parentRow.add(createMap("id", "1"));
+    MockDataSource.setIterator("select * from x", (Iterator) parentRow.iterator());
+
+    List childRow = new ArrayList();
+    for(Map row : parentRow){
+        for(int i=0;i<4;i++){
+           childRow.add(createMap("xid", row.get("id"),
+                   "desc", Integer.toString(i)));
+        }
+    }
+
+    MockDataSource.setIterator("select * from y", childRow.iterator());
+
+    runFullImport(config);
+
+    assertQ(req("id:1"), "//*[@numFound='1']");
+    assertQ(req("*:*"), "//*[@numFound='4']");
+    assertQ(req("desc:0"), "//*[@numFound='4']");
+    assertQ(req("desc:1"), "//*[@numFound='4']");
+    assertQ(req("desc:2"), "//*[@numFound='4']");
+    assertQ(req("desc:3"), "//*[@numFound='4']");
+  }
+  
   private static String dataConfig = "<dataConfig>\n"
           +"<dataSource  type=\"MockDataSource\"/>\n"
           + "       <document>\n"
@@ -66,4 +109,20 @@
           + "                               <field column=\"desc\" />\n"
           + "                       </entity>\n" + "               </entity>\n"
           + "       </document>\n" + "</dataConfig>";
+
+  private static final String threads2 = "threads=\"2\"";
+  
+  private static String dataCachedConfig = "<dataConfig>\n"
+      +"<dataSource  type=\"MockDataSource\"/>\n"
+      + "       <document>\n"
+      + "               <entity name=\"x\" "+threads2+" query=\"select * from x\" deletedPkQuery=\"select id from x where last_modified > NOW AND deleted='true'\" deltaQuery=\"select id from x where last_modified > NOW\" " +
+                         "processor=\"CachedSqlEntityProcessor\""+ ">\n"
+      + "                       <field column=\"id\" />\n"
+      + "                       <entity name=\"y\" query=\"select * from y\" where=\"xid=x.id\" " +
+      "processor=\"CachedSqlEntityProcessor\">\n"
+      + "                               <field column=\"desc\" />\n"
+      + "                       </entity>\n" + "               </entity>\n"
+      + "       </document>\n" + "</dataConfig>";
+  
+  private static String dataCachedSingleThConfig = Pattern.compile(threads2).matcher(dataCachedConfig).replaceAll("threads=\"1\"");
 }
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.DocBuilder doFullDump
情報: running multithreaded full-import
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow
詳細レベル (低): arow : {id=2}
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow
詳細レベル (低): arow : {id=3}
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow
詳細レベル (低): arow : null
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.DocBuilder$EntityRunner runAThread
詳細レベル (低): a row on docrootSolrInputDocument[{id=id(1.0)={3}}]
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.DocBuilder$EntityRunner runAThread
詳細レベル (低): adding a doc SolrInputDocument[{id=id(1.0)={3}}]
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow
詳細レベル (低): arow : {desc=0, xid=2}
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow
詳細レベル (低): arow : {desc=1, xid=2}
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow
詳細レベル (低): arow : {desc=2, xid=2}
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow
詳細レベル (低): arow : {desc=3, xid=2}
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow
詳細レベル (低): arow : null
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.DocBuilder$EntityRunner runAThread
詳細レベル (低): a row on docrootSolrInputDocument[{id=id(1.0)={2}, desc=desc(1.0)={[0, 1, 2, 3]}}]
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.DocBuilder$EntityRunner runAThread
詳細レベル (低): adding a doc SolrInputDocument[{id=id(1.0)={2}, desc=desc(1.0)={[0, 1, 2, 3]}}]
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow
詳細レベル (低): arow : {id=4}
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow
詳細レベル (低): arow : {id=1}
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow
詳細レベル (低): arow : null
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.DocBuilder$EntityRunner runAThread
詳細レベル (低): a row on docrootSolrInputDocument[{id=id(1.0)={1}}]
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.DocBuilder$EntityRunner runAThread
詳細レベル (低): adding a doc SolrInputDocument[{id=id(1.0)={1}}]
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow
詳細レベル (低): arow : null
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow
詳細レベル (低): arow : {desc=0, xid=4}
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow
詳細レベル (低): arow : {desc=1, xid=4}
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow
詳細レベル (低): arow : {desc=2, xid=4}
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow
詳細レベル (低): arow : {desc=3, xid=4}
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow
詳細レベル (低): arow : null
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.DocBuilder$EntityRunner runAThread
詳細レベル (低): a row on docrootSolrInputDocument[{id=id(1.0)={4}, desc=desc(1.0)={[0, 1, 2, 3]}}]
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.DocBuilder$EntityRunner runAThread
詳細レベル (低): adding a doc SolrInputDocument[{id=id(1.0)={4}, desc=desc(1.0)={[0, 1, 2, 3]}}]

Reply via email to