Author: davsclaus Date: Sat Nov 6 14:29:27 2010 New Revision: 1032071 URL: http://svn.apache.org/viewvc?rev=1032071&view=rev Log: CAMEL-3318: Applied patch with thanks to Steve Bate.
Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjEngineTest.java Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java?rev=1032071&r1=1032070&r2=1032071&view=diff ============================================================================== --- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java (original) +++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java Sat Nov 6 14:29:27 2010 @@ -25,6 +25,10 @@ import org.apache.camel.impl.DefaultComp import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import quickfix.LogFactory; +import quickfix.MessageFactory; +import quickfix.MessageStoreFactory; + public class QuickfixjComponent extends DefaultComponent { private static final Logger LOG = LoggerFactory.getLogger(QuickfixjComponent.class); @@ -32,6 +36,10 @@ public class QuickfixjComponent extends private final Map<String, QuickfixjEngine> engines = new HashMap<String, QuickfixjEngine>(); private final Map<String, QuickfixjEndpoint> endpoints = new HashMap<String, QuickfixjEndpoint>(); + private MessageStoreFactory messageStoreFactory; + private LogFactory logFactory; + private MessageFactory messageFactory; + @Override protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { // Look up the engine instance based on the settings file ("remaining") @@ -43,7 +51,7 @@ public class QuickfixjComponent extends engine = engines.get(remaining); if (engine == null) { LOG.info("Creating QuickFIX/J engine using settings: " + remaining); - engine = new QuickfixjEngine(remaining, false); + engine = new QuickfixjEngine(remaining, false, messageStoreFactory, logFactory, messageFactory); engines.put(remaining, engine); if (isStarted()) { startQuickfixjEngine(engine); @@ -90,4 +98,16 @@ public class QuickfixjComponent extends Map<String, QuickfixjEngine> getEngines() { return Collections.unmodifiableMap(engines); } + + public void setMessageFactory(MessageFactory messageFactory) { + this.messageFactory = messageFactory; + } + + public void setLogFactory(LogFactory logFactory) { + this.logFactory = logFactory; + } + + public void setMessageStoreFactory(MessageStoreFactory messageStoreFactory) { + this.messageStoreFactory = messageStoreFactory; + } } Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java?rev=1032071&r1=1032070&r2=1032071&view=diff ============================================================================== --- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java (original) +++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java Sat Nov 6 14:29:27 2010 @@ -52,6 +52,7 @@ import quickfix.Message; import quickfix.MessageFactory; import quickfix.MessageStoreFactory; import quickfix.RejectLogon; +import quickfix.SLF4JLogFactory; import quickfix.ScreenLogFactory; import quickfix.Session; import quickfix.SessionFactory; @@ -100,7 +101,16 @@ public class QuickfixjEngine { ThreadPerConnector, ThreadPerSession; } - public QuickfixjEngine(String settingsResourceName, boolean forcedShutdown) throws ConfigError, FieldConvertError, IOException, JMException { + public QuickfixjEngine(String settingsResourceName, boolean forcedShutdown) + throws ConfigError, FieldConvertError, IOException, JMException { + + this(settingsResourceName, forcedShutdown, null, null, null); + } + + public QuickfixjEngine(String settingsResourceName, boolean forcedShutdown, + MessageStoreFactory messageStoreFactoryOverride, LogFactory sessionLogFactoryOverride, MessageFactory messageFactoryOverride) + throws ConfigError, FieldConvertError, IOException, JMException { + this.forcedShutdown = forcedShutdown; this.settingsResourceName = settingsResourceName; @@ -111,10 +121,17 @@ public class QuickfixjEngine { SessionSettings settings = new SessionSettings(inputStream); - // TODO Make the message factory configurable for advanced users - messageFactory = new DefaultMessageFactory(); - sessionLogFactory = inferLogFactory(settings); - messageStoreFactory = inferMessageStoreFactory(settings); + messageFactory = messageFactoryOverride != null + ? messageFactoryOverride + : new DefaultMessageFactory(); + + sessionLogFactory = sessionLogFactoryOverride != null + ? sessionLogFactoryOverride + : inferLogFactory(settings); + + messageStoreFactory = messageStoreFactoryOverride != null + ? messageStoreFactoryOverride + : inferMessageStoreFactory(settings); // Set default session schedule if not specified in configuration if (!settings.isSetting(Session.SETTING_START_TIME)) { @@ -276,6 +293,7 @@ public class QuickfixjEngine { isFileLog(settings, impliedLogFactories); isScreenLog(settings, impliedLogFactories); isJdbcLog(settings, impliedLogFactories); + isSL4JLog(settings, impliedLogFactories); if (impliedLogFactories.size() > 1) { throw new ConfigError("Ambiguous log factory implied in configuration"); } @@ -310,6 +328,17 @@ public class QuickfixjEngine { } } + private void isSL4JLog(SessionSettings settings, Set<LogFactory> impliedLogFactories) { + if (impliedLogFactories.size() == 0) { + for (Object key : settings.getDefaultProperties().keySet()) { + if (key.toString().startsWith("SLF4J")) { + impliedLogFactories.add(new SLF4JLogFactory(settings)); + return; + } + } + } + } + private boolean isConnectorRole(SessionSettings settings, String connectorRole) throws ConfigError { boolean hasRole = false; Iterator<SessionID> sessionIdItr = settings.sectionIterator(); Modified: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java?rev=1032071&r1=1032070&r2=1032071&view=diff ============================================================================== --- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java (original) +++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java Sat Nov 6 14:29:27 2010 @@ -20,6 +20,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.lang.reflect.Method; +import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; import java.util.concurrent.CountDownLatch; @@ -42,8 +43,14 @@ import org.junit.Before; import org.junit.Test; import quickfix.Acceptor; +import quickfix.DefaultMessageFactory; import quickfix.FixVersions; import quickfix.Initiator; +import quickfix.LogFactory; +import quickfix.MemoryStoreFactory; +import quickfix.MessageFactory; +import quickfix.MessageStoreFactory; +import quickfix.ScreenLogFactory; import quickfix.Session; import quickfix.SessionFactory; import quickfix.SessionID; @@ -68,7 +75,10 @@ public class QuickfixjComponentTest { private SessionID sessionID; private SessionSettings settings; private QuickfixjComponent component; - + private MessageFactory engineMessageFactory; + private MessageStoreFactory engineMessageStoreFactory; + private LogFactory engineLogFactory; + private void setSessionID(SessionSettings sessionSettings, SessionID sessionID) { sessionSettings.setString(sessionID, SessionSettings.BEGINSTRING, sessionID.getBeginString()); sessionSettings.setString(sessionID, SessionSettings.SENDERCOMPID, sessionID.getSenderCompID()); @@ -88,10 +98,6 @@ public class QuickfixjComponentTest { settingsFile = File.createTempFile("quickfixj_test_", ".cfg"); tempdir = settingsFile.getParentFile(); URL[] urls = new URL[] {tempdir.toURI().toURL()}; - - contextClassLoader = Thread.currentThread().getContextClassLoader(); - ClassLoader testClassLoader = new URLClassLoader(urls, contextClassLoader); - Thread.currentThread().setContextClassLoader(testClassLoader); sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "FOO", "BAR"); @@ -101,9 +107,30 @@ public class QuickfixjComponentTest { settings.setBool(Session.SETTING_USE_DATA_DICTIONARY, false); setSessionID(settings, sessionID); + contextClassLoader = Thread.currentThread().getContextClassLoader(); + ClassLoader testClassLoader = new URLClassLoader(urls, contextClassLoader); + Thread.currentThread().setContextClassLoader(testClassLoader); + } + + private void setUpComponent() throws IOException, MalformedURLException, NoSuchMethodException { + setUpComponent(false); + } + + private void setUpComponent(boolean injectQfjPlugins) throws IOException, MalformedURLException, NoSuchMethodException { DefaultCamelContext camelContext = new DefaultCamelContext(); component = new QuickfixjComponent(); component.setCamelContext(camelContext); + + if (injectQfjPlugins) { + engineMessageFactory = new DefaultMessageFactory(); + engineMessageStoreFactory = new MemoryStoreFactory(); + engineLogFactory = new ScreenLogFactory(); + + component.setMessageFactory(engineMessageFactory); + component.setMessageStoreFactory(engineMessageStoreFactory); + component.setLogFactory(engineLogFactory); + } + assertThat(component.getEngines().size(), is(0)); Method converterMethod = QuickfixjConverters.class.getMethod("toSessionID", new Class<?>[] {String.class}); @@ -113,11 +140,15 @@ public class QuickfixjComponentTest { @After public void tearDown() throws Exception { Thread.currentThread().setContextClassLoader(contextClassLoader); - component.stop(); + if (component != null) { + component.stop(); + } } @Test public void createEndpointBeforeComponentStart() throws Exception { + setUpComponent(); + settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE); settings.setLong(sessionID, Initiator.SETTING_SOCKET_CONNECT_PORT, 1234); @@ -147,6 +178,8 @@ public class QuickfixjComponentTest { @Test public void createEndpointAfterComponentStart() throws Exception { + setUpComponent(); + settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE); settings.setLong(sessionID, Initiator.SETTING_SOCKET_CONNECT_PORT, 1234); @@ -171,6 +204,8 @@ public class QuickfixjComponentTest { @Test public void componentStop() throws Exception { + setUpComponent(); + settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE); settings.setLong(sessionID, Initiator.SETTING_SOCKET_CONNECT_PORT, 1234); @@ -204,6 +239,8 @@ public class QuickfixjComponentTest { @Test public void messagePublication() throws Exception { + setUpComponent(); + // Create settings file with both acceptor and initiator SessionSettings settings = new SessionSettings(); @@ -268,6 +305,27 @@ public class QuickfixjComponentTest { assertTrue("Messages not received", messageLatch.await(5000, TimeUnit.MILLISECONDS)); } + @Test + public void userSpecifiedQuickfixjPlugins() throws Exception { + setUpComponent(true); + + settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE); + settings.setLong(sessionID, Initiator.SETTING_SOCKET_CONNECT_PORT, 1234); + + writeSettings(); + + component.createEndpoint(getEndpointUri(settingsFile.getName(), null)); + + component.start(); + + assertThat(component.getEngines().size(), is(1)); + QuickfixjEngine engine = component.getEngines().values().iterator().next(); + + assertThat(engine.getMessageFactory(), is(engineMessageFactory)); + assertThat(engine.getMessageStoreFactory(), is(engineMessageStoreFactory)); + assertThat(engine.getLogFactory(), is(engineLogFactory)); + } + private void writeSettings() throws IOException { writeSettings(settings); } Modified: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjEngineTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjEngineTest.java?rev=1032071&r1=1032070&r2=1032071&view=diff ============================================================================== --- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjEngineTest.java (original) +++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjEngineTest.java Sat Nov 6 14:29:27 2010 @@ -26,6 +26,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; + import javax.management.JMException; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -34,6 +35,8 @@ import org.apache.mina.common.TransportT import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; + import quickfix.Acceptor; import quickfix.ConfigError; import quickfix.DefaultMessageFactory; @@ -46,8 +49,12 @@ import quickfix.Initiator; import quickfix.JdbcLogFactory; import quickfix.JdbcSetting; import quickfix.JdbcStoreFactory; +import quickfix.LogFactory; import quickfix.MemoryStoreFactory; import quickfix.Message; +import quickfix.MessageFactory; +import quickfix.MessageStoreFactory; +import quickfix.SLF4JLogFactory; import quickfix.ScreenLogFactory; import quickfix.Session; import quickfix.SessionFactory; @@ -71,7 +78,6 @@ import static org.junit.Assert.assertTha import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - public class QuickfixjEngineTest { private File settingsFile; private ClassLoader contextClassLoader; @@ -287,6 +293,23 @@ public class QuickfixjEngineTest { } @Test + public void inferSlf4jLog() throws Exception { + settings.setString(SLF4JLogFactory.SETTING_EVENT_CATEGORY, "Events"); + settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE); + + writeSettings(); + + quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false); + + assertThat(quickfixjEngine.getInitiator(), notNullValue()); + assertThat(quickfixjEngine.getAcceptor(), nullValue()); + assertThat(quickfixjEngine.getSettingsResourceName(), is(settingsFile.getName())); + assertThat(quickfixjEngine.getMessageStoreFactory(), instanceOf(MemoryStoreFactory.class)); + assertThat(quickfixjEngine.getLogFactory(), instanceOf(SLF4JLogFactory.class)); + assertThat(quickfixjEngine.getMessageFactory(), instanceOf(DefaultMessageFactory.class)); + } + + @Test public void ambiguousLog() throws Exception { settings.setString(FileLogFactory.SETTING_FILE_LOG_PATH, tempdir.toString()); settings.setBool(ScreenLogFactory.SETTING_LOG_EVENTS, true); @@ -307,6 +330,24 @@ public class QuickfixjEngineTest { } @Test + public void useExplicitComponentImplementations() throws Exception { + settings.setString(SLF4JLogFactory.SETTING_EVENT_CATEGORY, "Events"); + settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE); + + writeSettings(); + + MessageStoreFactory messageStoreFactory = Mockito.mock(MessageStoreFactory.class); + LogFactory logFactory = Mockito.mock(LogFactory.class); + MessageFactory messageFactory = Mockito.mock(MessageFactory.class); + + quickfixjEngine = new QuickfixjEngine(settingsFile.getName(), false, messageStoreFactory, logFactory, messageFactory); + + assertThat(quickfixjEngine.getMessageStoreFactory(), is(messageStoreFactory)); + assertThat(quickfixjEngine.getLogFactory(), is(logFactory)); + assertThat(quickfixjEngine.getMessageFactory(), is(messageFactory)); + } + + @Test public void enableJmxForInitiator() throws Exception { settings.setBool(QuickfixjEngine.SETTING_USE_JMX, true); settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE);