Hello, First of all I've got the same feeling for some time. I've checked TestThreaded<http://svn.apache.org/repos/asf/lucene/dev/tags/lucene_solr_3_4_0/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestThreaded.java> and got that it doesn't cover CachedSqlEntityProcessor with fancy where="xid= x.id" feature. Pls find the test for it attached: dih-mt-cached-race-condition-TestThreaded.java.patch (as a patch for TestThreaded from tags/lucene_solr_3_4_0)
Most times testCached*MultiThread*_FullImport() fails (but really rare it passed) and testCached*SingleThread*_FullImport() is always green. The difference is one and two threads. Pls also find attached: dih-mt-cached-race-condition-TestThreaded.log contains FINEST messages including the root cause (my guess): 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.*ThreadedEntityProcessorWrapper nextRow* 詳細レベル (低): *arow : null* * * (I'm sorry for hieroglyphs, I don't know where they comes from, it should be just FINEST) and corrupted documents, where some of child entity fields are missed. dih-mt-cached-race-condition-TestThreaded.test.out - is the actual test failure (I'm sorry but it was another run than in .log). The failed assert is the searching root entities by the children terms. I'm not sure but creating children entity runners in multiple threads looks really suspicious for me, I hadn't have a time to look deeper into the code. PS. I want to contribute two bleeding ideas (and implementations too), which improves DIH much, but this race condition is a blocker for them. Regards On Mon, Oct 3, 2011 at 10:09 PM, Maria Vazquez <maria.vazq...@dexone.com>wrote: > When I'm debugging, if it is single threaded > CachedSqlEntityProcessor.getAllNonCachedRows is called only once, all the > rows cached and next time it requests a row it gets it from the cached > data. > In the logs I see the SQL call only once. > > If I use multiple threads, it calls > CachedSqlEntityProcessor.getAllNonCachedRows multiple times, so it creates > a > new cache per thread. > > The cache should be shared among threads and be safe, right? > > Thanks! > Maria > > > > > On 10/1/11 8:00 PM, "pulkitsing...@gmail.com" <pulkitsing...@gmail.com> > wrote: > > > What part of the source code in debug mode behaved in a fashion such as > to > > make it seem like it is not thread-safe? > > > > If it feels difficult to put into words then you can always make a small > 5 min > > screencast to demo the issue and talk about it. I do that for really > complex > > stuff with Jing by techsmith (free version). > > > > Or you can put up a small and simplified test case together to demo the > issue > > and then paste the link for that hosted attachment :) > > > > Sent from my iPhone > > > > On Sep 30, 2011, at 1:28 PM, Maria Vazquez <maria.vazq...@dexone.com> > wrote: > > > >> Hi, > >> I¹m using threads with JdbcDataStore and CachedSqlEntityProcessor. > >> I noticed that if I make it single threaded CachedSqlEntityProcessor > behaves > >> as expected (it only queries the db once and caches all the rows). If I > make > >> it multi threaded it seems to make multiple db queries and when I debug > the > >> source code it looks like it is not thread safe. > >> Any ideas? > >> Thanks, > >> Maria > >> > > -- Sincerely yours Mikhail (Mike) Khludnev Developer Grid Dynamics tel. 1-415-738-8644 Skype: mkhludnev <http://www.griddynamics.com> <mkhlud...@griddynamics.com>
Index: solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestThreaded.java =================================================================== --- solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestThreaded.java (revision 1180097) +++ solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestThreaded.java (working copy) @@ -19,13 +19,16 @@ import org.junit.BeforeClass; import org.junit.Test; +import java.util.Iterator; import java.util.List; import java.util.ArrayList; import java.util.Map; +import java.util.regex.Pattern; public class TestThreaded extends AbstractDataImportHandlerTestCase { - @BeforeClass + +@BeforeClass public static void beforeClass() throws Exception { initCore("dataimport-solrconfig.xml", "dataimport-schema.xml"); } @@ -57,6 +60,46 @@ assertQ(req("desc:hello"), "//*[@numFound='4']"); } + @Test + public void testCachedSingleThread_FullImport() throws Exception { + testCached(dataCachedSingleThConfig); + } + + @Test + public void testCachedMultiThread_FullImport() throws Exception { + testCached(dataCachedConfig); + } + + @SuppressWarnings("unchecked") + public void testCached(String config) throws Exception { + List<Map> parentRow = new ArrayList(); +// parentRow.add(createMap("id", "1")); + parentRow.add(createMap("id", "2")); + parentRow.add(createMap("id", "3")); + parentRow.add(createMap("id", "4")); + parentRow.add(createMap("id", "1")); + MockDataSource.setIterator("select * from x", (Iterator) parentRow.iterator()); + + List childRow = new ArrayList(); + for(Map row : parentRow){ + for(int i=0;i<4;i++){ + childRow.add(createMap("xid", row.get("id"), + "desc", Integer.toString(i))); + } + } + + MockDataSource.setIterator("select * from y", childRow.iterator()); + + runFullImport(config); + + assertQ(req("id:1"), "//*[@numFound='1']"); + assertQ(req("*:*"), "//*[@numFound='4']"); + assertQ(req("desc:0"), "//*[@numFound='4']"); + assertQ(req("desc:1"), "//*[@numFound='4']"); + assertQ(req("desc:2"), "//*[@numFound='4']"); + assertQ(req("desc:3"), "//*[@numFound='4']"); + } + private static String dataConfig = "<dataConfig>\n" +"<dataSource type=\"MockDataSource\"/>\n" + " <document>\n" @@ -66,4 +109,20 @@ + " <field column=\"desc\" />\n" + " </entity>\n" + " </entity>\n" + " </document>\n" + "</dataConfig>"; + + private static final String threads2 = "threads=\"2\""; + + private static String dataCachedConfig = "<dataConfig>\n" + +"<dataSource type=\"MockDataSource\"/>\n" + + " <document>\n" + + " <entity name=\"x\" "+threads2+" query=\"select * from x\" deletedPkQuery=\"select id from x where last_modified > NOW AND deleted='true'\" deltaQuery=\"select id from x where last_modified > NOW\" " + + "processor=\"CachedSqlEntityProcessor\""+ ">\n" + + " <field column=\"id\" />\n" + + " <entity name=\"y\" query=\"select * from y\" where=\"xid=x.id\" " + + "processor=\"CachedSqlEntityProcessor\">\n" + + " <field column=\"desc\" />\n" + + " </entity>\n" + " </entity>\n" + + " </document>\n" + "</dataConfig>"; + + private static String dataCachedSingleThConfig = Pattern.compile(threads2).matcher(dataCachedConfig).replaceAll("threads=\"1\""); }
2011/10/08 0:01:13 org.apache.solr.handler.dataimport.DocBuilder doFullDump 情報: running multithreaded full-import 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow 詳細レベル (低): arow : {id=2} 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow 詳細レベル (低): arow : {id=3} 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow 詳細レベル (低): arow : null 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.DocBuilder$EntityRunner runAThread 詳細レベル (低): a row on docrootSolrInputDocument[{id=id(1.0)={3}}] 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.DocBuilder$EntityRunner runAThread 詳細レベル (低): adding a doc SolrInputDocument[{id=id(1.0)={3}}] 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow 詳細レベル (低): arow : {desc=0, xid=2} 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow 詳細レベル (低): arow : {desc=1, xid=2} 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow 詳細レベル (低): arow : {desc=2, xid=2} 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow 詳細レベル (低): arow : {desc=3, xid=2} 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow 詳細レベル (低): arow : null 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.DocBuilder$EntityRunner runAThread 詳細レベル (低): a row on docrootSolrInputDocument[{id=id(1.0)={2}, desc=desc(1.0)={[0, 1, 2, 3]}}] 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.DocBuilder$EntityRunner runAThread 詳細レベル (低): adding a doc SolrInputDocument[{id=id(1.0)={2}, desc=desc(1.0)={[0, 1, 2, 3]}}] 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow 詳細レベル (低): arow : {id=4} 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow 詳細レベル (低): arow : {id=1} 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow 詳細レベル (低): arow : null 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.DocBuilder$EntityRunner runAThread 詳細レベル (低): a row on docrootSolrInputDocument[{id=id(1.0)={1}}] 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.DocBuilder$EntityRunner runAThread 詳細レベル (低): adding a doc SolrInputDocument[{id=id(1.0)={1}}] 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow 詳細レベル (低): arow : null 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow 詳細レベル (低): arow : {desc=0, xid=4} 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow 詳細レベル (低): arow : {desc=1, xid=4} 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow 詳細レベル (低): arow : {desc=2, xid=4} 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow 詳細レベル (低): arow : {desc=3, xid=4} 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.ThreadedEntityProcessorWrapper nextRow 詳細レベル (低): arow : null 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.DocBuilder$EntityRunner runAThread 詳細レベル (低): a row on docrootSolrInputDocument[{id=id(1.0)={4}, desc=desc(1.0)={[0, 1, 2, 3]}}] 2011/10/08 0:01:13 org.apache.solr.handler.dataimport.DocBuilder$EntityRunner runAThread 詳細レベル (低): adding a doc SolrInputDocument[{id=id(1.0)={4}, desc=desc(1.0)={[0, 1, 2, 3]}}]