Author: hadrian Date: Thu Jun 2 15:26:12 2011 New Revision: 1130604 URL: http://svn.apache.org/viewvc?rev=1130604&view=rev Log: CAMEL-4038. Original contributed patch applied with thanks to Steve. Checkstyle fixes to follow
Added: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessageCorrelator.java camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessagePredicate.java camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/RequestReplyExample.java Modified: camel/trunk/components/camel-quickfix/pom.xml camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConvertersTest.java camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutorExample.java camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonTransformer.java camel/trunk/components/camel-quickfix/src/test/resources/examples/inprocess.cfg Modified: camel/trunk/components/camel-quickfix/pom.xml URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/pom.xml?rev=1130604&r1=1130603&r2=1130604&view=diff ============================================================================== --- camel/trunk/components/camel-quickfix/pom.xml (original) +++ camel/trunk/components/camel-quickfix/pom.xml Thu Jun 2 15:26:12 2011 @@ -67,12 +67,16 @@ <artifactId>slf4j-api</artifactId> </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-jetty</artifactId> + <scope>test</scope> + </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <scope>test</scope> </dependency> - <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> Added: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessageCorrelator.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessageCorrelator.java?rev=1130604&view=auto ============================================================================== --- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessageCorrelator.java (added) +++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessageCorrelator.java Thu Jun 2 15:26:12 2011 @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.quickfixj; + +import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Exchange; +import org.apache.camel.ExchangeTimedOutException; + +import quickfix.Message; +import quickfix.SessionID; + +public class MessageCorrelator implements QuickfixjEventListener { + public static final long DEFAULT_CORRELATION_TIMEOUT = 1000L; + private final CopyOnWriteArrayList<MessageCorrelationRule> rules = new CopyOnWriteArrayList<MessageCorrelationRule>(); + + public Callable<Message> getReply(SessionID sessionID, Exchange exchange) + throws InterruptedException, ExchangeTimedOutException { + + MessagePredicate messageCriteria = (MessagePredicate) exchange + .getProperty(QuickfixjProducer.CORRELATION_CRITERIA_KEY); + final MessageCorrelationRule correlationRule = new MessageCorrelationRule( + exchange, sessionID, messageCriteria); + + final long timeout = exchange.getProperty( + QuickfixjProducer.CORRELATION_TIMEOUT_KEY, + DEFAULT_CORRELATION_TIMEOUT, Long.class); + + rules.add(correlationRule); + + return new Callable<Message>() { + @Override + public Message call() throws Exception { + if (!correlationRule.getLatch().await(timeout, + TimeUnit.MILLISECONDS)) { + throw new ExchangeTimedOutException( + correlationRule.getExchange(), timeout); + } + return correlationRule.getReplyMessage(); + } + }; + } + + @Override + public void onEvent(QuickfixjEventCategory eventCategory, + SessionID sessionID, Message message) throws Exception { + if (message != null) { + for (MessageCorrelationRule rule : rules) { + if (rule.getMessageCriteria().evaluate(message)) { + rule.setReplyMessage(message); + rules.remove(rule); + rule.getLatch().countDown(); + } + } + } + } + + private class MessageCorrelationRule { + private final Exchange exchange; + private final CountDownLatch latch = new CountDownLatch(1); + private final MessagePredicate messageCriteria; + + private Message replyMessage; + + public MessageCorrelationRule(Exchange exchange, SessionID sessionID, + MessagePredicate messageCriteria) { + this.exchange = exchange; + this.messageCriteria = messageCriteria; + } + + public void setReplyMessage(Message message) { + this.replyMessage = message; + } + + public Message getReplyMessage() { + return replyMessage; + } + + public CountDownLatch getLatch() { + return latch; + } + + public Exchange getExchange() { + return exchange; + } + + public MessagePredicate getMessageCriteria() { + return messageCriteria; + } + } + +} Added: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessagePredicate.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessagePredicate.java?rev=1130604&view=auto ============================================================================== --- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessagePredicate.java (added) +++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessagePredicate.java Thu Jun 2 15:26:12 2011 @@ -0,0 +1,68 @@ +package org.apache.camel.component.quickfixj; + +import java.util.ArrayList; +import java.util.List; + +import quickfix.Field; +import quickfix.FieldMap; +import quickfix.FieldNotFound; +import quickfix.Message; +import quickfix.SessionID; +import quickfix.field.MsgType; +import quickfix.field.SenderCompID; +import quickfix.field.TargetCompID; + +public class MessagePredicate { + private final List<Field<String>> headerCriteria = new ArrayList<Field<String>>(); + private final List<Field<String>> bodyCriteria = new ArrayList<Field<String>>(); + + public MessagePredicate(SessionID requestingSessionID, String msgType) { + // Reverse session ID for reply + // TODO may need to optionally include subID and locationID + addHeaderFieldIfPresent(SenderCompID.FIELD, requestingSessionID.getTargetCompID()); + addHeaderFieldIfPresent(TargetCompID.FIELD, requestingSessionID.getSenderCompID()); + withMessageType(msgType); + } + + private void addHeaderFieldIfPresent(int tag, String value) { + if (value != null && !"".equals(value)) { + withHeaderField(tag, value); + } + } + + public boolean evaluate(Message message) { + return evaluate(message, bodyCriteria) && evaluate(message.getHeader(), headerCriteria); + } + + private boolean evaluate(FieldMap fieldMap, List<Field<String>> criteria) { + for (Field<String> c : criteria) { + String value = null; + try { + if (fieldMap.isSetField(c.getField())) { + value = fieldMap.getString(c.getField()); + } + } catch (FieldNotFound e) { + // ignored, shouldn't happen + } + if (!c.getObject().equals(value)) { + return false; + } + } + return true; + } + + public MessagePredicate withField(int tag, String value) { + bodyCriteria.add(new Field<String>(tag, value)); + return this; + } + + public MessagePredicate withHeaderField(int tag, String value) { + headerCriteria.add(new Field<String>(tag, value)); + return this; + } + + private MessagePredicate withMessageType(String msgType) { + headerCriteria.add(new Field<String>(MsgType.FIELD, msgType)); + return this; + } +} Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java?rev=1130604&r1=1130603&r2=1130604&view=diff ============================================================================== --- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java (original) +++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java Thu Jun 2 15:26:12 2011 @@ -22,6 +22,7 @@ import java.net.URISyntaxException; import java.util.Collections; import java.util.HashMap; import java.util.Map; + import javax.management.JMException; import org.apache.camel.Endpoint; @@ -29,6 +30,7 @@ import org.apache.camel.impl.DefaultComp import org.apache.camel.util.UnsafeUriCharactersEncoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import quickfix.ConfigError; import quickfix.FieldConvertError; import quickfix.LogFactory; @@ -72,7 +74,7 @@ public class QuickfixjComponent extends } } - endpoint = new QuickfixjEndpoint(uri, getCamelContext()); + endpoint = new QuickfixjEndpoint(engine, uri, getCamelContext()); engine.addEventListener(endpoint); endpoints.put(uri, endpoint); } Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java?rev=1130604&r1=1130603&r2=1130604&view=diff ============================================================================== --- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java (original) +++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java Thu Jun 2 15:26:12 2011 @@ -18,12 +18,17 @@ package org.apache.camel.component.quick import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; +import quickfix.MessageUtils; +import quickfix.Session; +import quickfix.SessionID; + public class QuickfixjConsumer extends DefaultConsumer { - public QuickfixjConsumer(Endpoint endpoint, Processor processor) { + public QuickfixjConsumer(Endpoint endpoint, Processor processor) { super(endpoint, processor); } @@ -31,9 +36,38 @@ public class QuickfixjConsumer extends D if (isStarted()) { try { getProcessor().process(exchange); + if (exchange.getPattern().isOutCapable() && exchange.hasOut()) { + sendOutMessage(exchange); + } } catch (Exception e) { exchange.setException(e); } } } + + private void sendOutMessage(Exchange exchange) { + try { + Message camelMessage = exchange.getOut(); + quickfix.Message quickfixjMessage = camelMessage.getBody(quickfix.Message.class); + + if (log.isDebugEnabled()) { + log.debug("Sending FIX message reply: " + quickfixjMessage.toString()); + } + + SessionID messageSessionID = MessageUtils.getReverseSessionID(exchange.getIn().getBody(quickfix.Message.class)); + + Session session = getSession(messageSessionID); + if (session == null) { + throw new IllegalStateException("Unknown session: " + messageSessionID); + } + + session.send(quickfixjMessage); + } catch (Exception e) { + exchange.setException(e); + } + } + + Session getSession(SessionID messageSessionID) { + return Session.lookupSession(messageSessionID); + } } Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java?rev=1130604&r1=1130603&r2=1130604&view=diff ============================================================================== --- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java (original) +++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java Thu Jun 2 15:26:12 2011 @@ -25,73 +25,124 @@ import org.apache.camel.Exchange; import org.apache.camel.MultipleConsumersSupport; import org.apache.camel.Processor; import org.apache.camel.Producer; +import org.apache.camel.ResolveEndpointFailedException; import org.apache.camel.component.quickfixj.converter.QuickfixjConverters; import org.apache.camel.impl.DefaultEndpoint; -import org.apache.camel.util.ServiceHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import quickfix.Message; import quickfix.SessionID; -public class QuickfixjEndpoint extends DefaultEndpoint implements QuickfixjEventListener, MultipleConsumersSupport { - public static final String EVENT_CATEGORY_KEY = "EventCategory"; - public static final String SESSION_ID_KEY = "SessionID"; - public static final String MESSAGE_TYPE_KEY = "MessageType"; - public static final String DATA_DICTIONARY_KEY = "DataDictionary"; - - private static final Logger LOG = LoggerFactory.getLogger(QuickfixjEndpoint.class); - - private SessionID sessionID; - private final List<QuickfixjConsumer> consumers = new CopyOnWriteArrayList<QuickfixjConsumer>(); - - public QuickfixjEndpoint(String uri, CamelContext context) { - super(uri, context); - } - - protected SessionID getSessionID() { - return sessionID; - } - - public void setSessionID(SessionID sessionID) { - this.sessionID = sessionID; - } - - public Consumer createConsumer(Processor processor) throws Exception { - LOG.info("Creating QuickFIX/J consumer: " + (sessionID != null ? sessionID : "No Session")); - QuickfixjConsumer consumer = new QuickfixjConsumer(this, processor); - consumers.add(consumer); - return consumer; - } - - public Producer createProducer() throws Exception { - LOG.info("Creating QuickFIX/J producer: " + (sessionID != null ? sessionID : "No Session")); - return new QuickfixjProducer(this); - } - - public boolean isSingleton() { - return true; - } - - public void onEvent(QuickfixjEventCategory eventCategory, SessionID sessionID, Message message) throws Exception { - if (this.sessionID == null || this.sessionID.equals(sessionID)) { - for (QuickfixjConsumer consumer : consumers) { - Exchange exchange = QuickfixjConverters.toExchange(this, sessionID, message, eventCategory); - consumer.onExchange(exchange); - if (exchange.getException() != null) { - throw exchange.getException(); - } - } - } - } - - public boolean isMultipleConsumersSupported() { - return true; - } - - @Override - protected void doStop() throws Exception { - // clear list of consumers - consumers.clear(); - } +public class QuickfixjEndpoint extends DefaultEndpoint implements + QuickfixjEventListener, MultipleConsumersSupport { + public static final String EVENT_CATEGORY_KEY = "EventCategory"; + public static final String SESSION_ID_KEY = "SessionID"; + public static final String MESSAGE_TYPE_KEY = "MessageType"; + public static final String DATA_DICTIONARY_KEY = "DataDictionary"; + + private static final Logger LOG = LoggerFactory + .getLogger(QuickfixjEndpoint.class); + + private SessionID sessionID; + private final List<QuickfixjConsumer> consumers = new CopyOnWriteArrayList<QuickfixjConsumer>(); + private final QuickfixjEngine engine; + + public QuickfixjEndpoint(QuickfixjEngine engine, String uri, CamelContext context) { + super(uri, context); + this.engine = engine; + } + + public SessionID getSessionID() { + return sessionID; + } + + public void setSessionID(SessionID sessionID) { + this.sessionID = sessionID; + } + + public Consumer createConsumer(Processor processor) throws Exception { + LOG.info("Creating QuickFIX/J consumer: " + + (sessionID != null ? sessionID : "No Session") + + ", ExchangePattern=" + getExchangePattern()); + QuickfixjConsumer consumer = new QuickfixjConsumer(this, processor); + consumers.add(consumer); + return consumer; + } + + public Producer createProducer() throws Exception { + LOG.info("Creating QuickFIX/J producer: " + + (sessionID != null ? sessionID : "No Session")); + if (isWildcarded()) { + throw new ResolveEndpointFailedException( + "Cannot create consumer on wildcarded session identifier: " + + sessionID); + } + return new QuickfixjProducer(this); + } + + public boolean isSingleton() { + return true; + } + + public void onEvent(QuickfixjEventCategory eventCategory, + SessionID sessionID, Message message) throws Exception { + if (this.sessionID == null || isMatching(sessionID)) { + for (QuickfixjConsumer consumer : consumers) { + Exchange exchange = QuickfixjConverters.toExchange(this, + sessionID, message, eventCategory); + consumer.onExchange(exchange); + if (exchange.getException() != null) { + throw exchange.getException(); + } + } + } + } + + private boolean isMatching(SessionID sessionID) { + return this.sessionID.equals(sessionID) + || (isMatching(this.sessionID.getBeginString(), + sessionID.getBeginString()) + && isMatching(this.sessionID.getSenderCompID(), + sessionID.getSenderCompID()) + && isMatching(this.sessionID.getSenderSubID(), + sessionID.getSenderSubID()) + && isMatching(this.sessionID.getSenderLocationID(), + sessionID.getSenderLocationID()) + && isMatching(this.sessionID.getTargetCompID(), + sessionID.getTargetCompID()) + && isMatching(this.sessionID.getTargetSubID(), + sessionID.getTargetSubID()) && isMatching( + this.sessionID.getTargetLocationID(), + sessionID.getTargetLocationID())); + } + + private boolean isMatching(String s1, String s2) { + return s1.equals("") || s1.equals("*") || s1.equals(s2); + } + + private boolean isWildcarded() { + return sessionID != null + && (sessionID.getBeginString().equals("*") + || sessionID.getSenderCompID().equals("*") + || sessionID.getSenderSubID().equals("*") + || sessionID.getSenderLocationID().equals("*") + || sessionID.getTargetCompID().equals("*") + || sessionID.getTargetSubID().equals("*") || sessionID + .getTargetLocationID().equals("*")); + } + + public boolean isMultipleConsumersSupported() { + return true; + } + + public QuickfixjEngine getEngine() { + return engine; + } + + @Override + protected void doStop() throws Exception { + // clear list of consumers + consumers.clear(); + } } Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java?rev=1130604&r1=1130603&r2=1130604&view=diff ============================================================================== --- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java (original) +++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java Thu Jun 2 15:26:12 2011 @@ -92,7 +92,8 @@ public class QuickfixjEngine { private final MessageStoreFactory messageStoreFactory; private final LogFactory sessionLogFactory; private final MessageFactory messageFactory; - + private final MessageCorrelator messageCorrelator = new MessageCorrelator(); + private boolean started; private List<QuickfixjEventListener> eventListeners = new CopyOnWriteArrayList<QuickfixjEventListener>(); @@ -119,6 +120,8 @@ public class QuickfixjEngine { MessageStoreFactory messageStoreFactoryOverride, LogFactory sessionLogFactoryOverride, MessageFactory messageFactoryOverride) throws ConfigError, FieldConvertError, IOException, JMException { + addEventListener(messageCorrelator); + this.uri = uri; this.forcedShutdown = forcedShutdown; @@ -463,6 +466,10 @@ public class QuickfixjEngine { return uri; } + public MessageCorrelator getMessageCorrelator() { + return messageCorrelator; + } + // For Testing Initiator getInitiator() { return initiator; Modified: camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java?rev=1130604&r1=1130603&r2=1130604&view=diff ============================================================================== --- camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java (original) +++ camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java Thu Jun 2 15:26:12 2011 @@ -16,6 +16,8 @@ */ package org.apache.camel.component.quickfixj; +import java.util.concurrent.Callable; + import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultProducer; @@ -26,31 +28,61 @@ import quickfix.Session; import quickfix.SessionID; public class QuickfixjProducer extends DefaultProducer { - private final SessionID sessionID; - + public static final String CORRELATION_TIMEOUT_KEY = "CorrelationTimeout"; + public static final String CORRELATION_CRITERIA_KEY = "CorrelationCriteria"; + + private final SessionID sessionID; + public QuickfixjProducer(Endpoint endpoint) { super(endpoint); sessionID = ((QuickfixjEndpoint) getEndpoint()).getSessionID(); } public void process(Exchange exchange) throws Exception { - Message message = exchange.getIn().getBody(Message.class); - if (log.isDebugEnabled()) { - log.debug("Sending FIX message: " + message.toString()); - } - - SessionID messageSessionID = sessionID; - if (messageSessionID == null) { - messageSessionID = MessageUtils.getSessionID(message); - } - - Session session = Session.lookupSession(messageSessionID); - if (session == null) { - exchange.setException(new IllegalStateException("Unknown session: " + messageSessionID)); - return; - } - - session.send(message); + sendMessage(exchange, exchange.getIn()); } + void sendMessage(Exchange exchange, org.apache.camel.Message camelMessage) + throws InterruptedException { + try { + Message message = camelMessage.getBody(Message.class); + if (log.isDebugEnabled()) { + log.debug("Sending FIX message: " + message.toString()); + } + + SessionID messageSessionID = sessionID; + if (messageSessionID == null) { + messageSessionID = MessageUtils.getSessionID(message); + } + + Session session = getSession(messageSessionID); + if (session == null) { + throw new IllegalStateException("Unknown session: " + + messageSessionID); + } + + Callable<Message> callable = null; + + if (exchange.getPattern().isOutCapable()) { + QuickfixjEndpoint endpoint = (QuickfixjEndpoint) getEndpoint(); + MessageCorrelator messageCorrelator = endpoint.getEngine() + .getMessageCorrelator(); + callable = messageCorrelator.getReply( + endpoint.getSessionID(), exchange); + } + + session.send(message); + + if (callable != null) { + Message reply = callable.call(); + exchange.getOut().setBody(reply); + } + } catch (Exception e) { + exchange.setException(e); + } + } + + Session getSession(SessionID messageSessionID) { + return Session.lookupSession(messageSessionID); + } } Modified: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java?rev=1130604&r1=1130603&r2=1130604&view=diff ============================================================================== --- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java (original) +++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java Thu Jun 2 15:26:12 2011 @@ -28,21 +28,41 @@ import org.junit.Test; import org.mockito.Matchers; import org.mockito.Mockito; +import quickfix.FixVersions; +import quickfix.Message; +import quickfix.MessageUtils; +import quickfix.Session; +import quickfix.SessionID; +import quickfix.field.BeginString; +import quickfix.field.SenderCompID; +import quickfix.field.TargetCompID; + public class QuickfixjConsumerTest { private Exchange mockExchange; private Processor mockProcessor; private Endpoint mockEndpoint; - + private SessionID sessionID; + private Message inboundFixMessage; + @Before public void setUp() { + mockExchange = Mockito.mock(Exchange.class); org.apache.camel.Message mockCamelMessage = Mockito.mock(org.apache.camel.Message.class); Mockito.when(mockExchange.getIn()).thenReturn(mockCamelMessage); + inboundFixMessage = new Message(); + inboundFixMessage.getHeader().setString(BeginString.FIELD, FixVersions.BEGINSTRING_FIX44); + inboundFixMessage.getHeader().setString(SenderCompID.FIELD, "SENDER"); + inboundFixMessage.getHeader().setString(TargetCompID.FIELD, "TARGET"); + sessionID = MessageUtils.getSessionID(inboundFixMessage); + + Mockito.when(mockCamelMessage.getBody(quickfix.Message.class)).thenReturn(inboundFixMessage); + mockProcessor = Mockito.mock(Processor.class); mockEndpoint = Mockito.mock(Endpoint.class); - Mockito.when(mockEndpoint.createExchange(ExchangePattern.InOnly)).thenReturn(mockExchange); + Mockito.when(mockEndpoint.createExchange(ExchangePattern.InOnly)).thenReturn(mockExchange); } @Test @@ -67,8 +87,7 @@ public class QuickfixjConsumerTest { } @Test - public void setExceptionOnExchange() throws Exception { - + public void setExceptionOnExchange() throws Exception { QuickfixjConsumer consumer = new QuickfixjConsumer(mockEndpoint, mockProcessor); consumer.start(); @@ -79,5 +98,45 @@ public class QuickfixjConsumerTest { consumer.onExchange(mockExchange); Mockito.verify(mockExchange).setException(exception); - } + } + + @Test + public void setExceptionOnInOutExchange() throws Exception { + org.apache.camel.Message mockCamelOutMessage = Mockito.mock(org.apache.camel.Message.class); + Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOut); + Mockito.when(mockExchange.hasOut()).thenReturn(true); + Mockito.when(mockExchange.getOut()).thenReturn(mockCamelOutMessage); + Mockito.when(mockCamelOutMessage.getBody(Message.class)).thenReturn(new Message()); + + QuickfixjConsumer consumer = new QuickfixjConsumer(mockEndpoint, mockProcessor); + consumer.start(); + + // Simulate a message from the FIX engine + consumer.onExchange(mockExchange); + + Mockito.verify(mockExchange).setException(Mockito.isA(IllegalStateException.class)); + } + + @Test + public void processInOutExchange() throws Exception { + org.apache.camel.Message mockCamelOutMessage = Mockito.mock(org.apache.camel.Message.class); + Mockito.when(mockExchange.hasOut()).thenReturn(true); + Mockito.when(mockExchange.getOut()).thenReturn(mockCamelOutMessage); + Message outboundFixMessage = new Message(); + Mockito.when(mockCamelOutMessage.getBody(Message.class)).thenReturn(outboundFixMessage); + + QuickfixjConsumer consumer = Mockito.spy(new QuickfixjConsumer(mockEndpoint, mockProcessor)); + Session mockSession = Mockito.spy(TestSupport.createSession(sessionID)); + Mockito.doReturn(mockSession).when(consumer).getSession(MessageUtils.getReverseSessionID(inboundFixMessage)); + Mockito.doReturn(true).when(mockSession).send(Mockito.isA(Message.class)); + + consumer.start(); + + Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOut); + + consumer.onExchange(mockExchange); + + Mockito.verify(mockExchange, Mockito.never()).setException(Mockito.isA(Exception.class)); + Mockito.verify(mockSession).send(outboundFixMessage); + } } Modified: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConvertersTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConvertersTest.java?rev=1130604&r1=1130603&r2=1130604&view=diff ============================================================================== --- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConvertersTest.java (original) +++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConvertersTest.java Thu Jun 2 15:26:12 2011 @@ -98,7 +98,7 @@ public class QuickfixjConvertersTest { @Test public void convertToExchange() { SessionID sessionID = new SessionID("FIX.4.0", "FOO", "BAR"); - QuickfixjEndpoint endpoint = new QuickfixjEndpoint("", camelContext); + QuickfixjEndpoint endpoint = new QuickfixjEndpoint(null, "", camelContext); Message message = new Message(); message.getHeader().setString(MsgType.FIELD, MsgType.ORDER_SINGLE); @@ -116,7 +116,7 @@ public class QuickfixjConvertersTest { @Test public void convertToExchangeWithNullMessage() { SessionID sessionID = new SessionID("FIX.4.0", "FOO", "BAR"); - QuickfixjEndpoint endpoint = new QuickfixjEndpoint("", camelContext); + QuickfixjEndpoint endpoint = new QuickfixjEndpoint(null, "", camelContext); Exchange exchange = QuickfixjConverters.toExchange(endpoint, sessionID, null, QuickfixjEventCategory.AppMessageSent); Modified: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java?rev=1130604&r1=1130603&r2=1130604&view=diff ============================================================================== --- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java (original) +++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java Thu Jun 2 15:26:12 2011 @@ -16,33 +16,133 @@ */ package org.apache.camel.component.quickfixj; +import java.io.IOException; +import java.util.Timer; +import java.util.TimerTask; + +import javax.management.JMException; + import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.junit.Before; import org.junit.Test; import org.mockito.Matchers; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import quickfix.ConfigError; +import quickfix.FieldConvertError; import quickfix.FixVersions; import quickfix.Message; +import quickfix.MessageUtils; +import quickfix.Session; import quickfix.SessionID; +import quickfix.field.BeginString; +import quickfix.field.MsgType; +import quickfix.field.SenderCompID; +import quickfix.field.TargetCompID; +import quickfix.fix42.Email; public class QuickfixjProducerTest { - + private Exchange mockExchange; + private QuickfixjEndpoint mockEndpoint; + private org.apache.camel.Message mockCamelMessage; + private QuickfixjProducer producer; + private SessionID sessionID; + private Message inboundFixMessage; + private QuickfixjEngine quickfixjEngine; + + @Before + public void setUp() throws ConfigError, FieldConvertError, IOException, JMException { + mockExchange = Mockito.mock(Exchange.class); + mockEndpoint = Mockito.mock(QuickfixjEndpoint.class); + mockCamelMessage = Mockito.mock(org.apache.camel.Message.class); + Mockito.when(mockExchange.getIn()).thenReturn(mockCamelMessage); + Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOnly); + + quickfixjEngine = TestSupport.createEngine(); + Mockito.when(mockEndpoint.getEngine()).thenReturn(quickfixjEngine); + + inboundFixMessage = new Message(); + inboundFixMessage.getHeader().setString(BeginString.FIELD, FixVersions.BEGINSTRING_FIX44); + inboundFixMessage.getHeader().setString(SenderCompID.FIELD, "SENDER"); + inboundFixMessage.getHeader().setString(TargetCompID.FIELD, "TARGET"); + sessionID = MessageUtils.getSessionID(inboundFixMessage); + + Mockito.when(mockCamelMessage.getBody(Message.class)).thenReturn(inboundFixMessage); + + Mockito.when(mockEndpoint.getSessionID()).thenReturn(sessionID); + + producer = Mockito.spy(new QuickfixjProducer(mockEndpoint)); + } + + @SuppressWarnings("serial") + public class TestException extends RuntimeException { + + } + @Test public void setExceptionOnExchange() throws Exception { - Exchange mockExchange = Mockito.mock(Exchange.class); + Session mockSession = Mockito.spy(TestSupport.createSession(sessionID)); + Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage)); + Mockito.doThrow(new TestException()).when(mockSession).send(Mockito.isA(Message.class)); - QuickfixjEndpoint mockEndpoint = Mockito.mock(QuickfixjEndpoint.class); - org.apache.camel.Message mockCamelMessage = Mockito.mock(org.apache.camel.Message.class); - Mockito.when(mockExchange.getIn()).thenReturn(mockCamelMessage); - Mockito.when(mockCamelMessage.getBody(Message.class)).thenReturn(new Message()); + producer.process(mockExchange); + Mockito.verify(mockExchange).setException(Matchers.isA(TestException.class)); + } + + @Test + public void processInOnlyExchange() throws Exception { + Session mockSession = Mockito.spy(TestSupport.createSession(sessionID)); + Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage)); + Mockito.doReturn(true).when(mockSession).send(Mockito.isA(Message.class)); + + producer.process(mockExchange); + + Mockito.verify(mockExchange, Mockito.never()).setException(Matchers.isA(IllegalStateException.class)); + Mockito.verify(mockSession).send(inboundFixMessage); + } - SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET"); - Mockito.when(mockEndpoint.getSessionID()).thenReturn(sessionID); + @Test + public void processInOutExchange() throws Exception { + Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOut); + Mockito.when(mockExchange.getProperty(QuickfixjProducer.CORRELATION_CRITERIA_KEY)). + thenReturn(new MessagePredicate(sessionID, MsgType.EMAIL)); + Mockito.when(mockExchange.getProperty( + QuickfixjProducer.CORRELATION_TIMEOUT_KEY, + 1000L, Long.class)).thenReturn(5000L); + + org.apache.camel.Message mockOutboundCamelMessage = Mockito.mock(org.apache.camel.Message.class); + Mockito.when(mockExchange.getOut()).thenReturn(mockOutboundCamelMessage); - QuickfixjProducer producer = new QuickfixjProducer(mockEndpoint); + final Message outboundFixMessage = new Email(); + outboundFixMessage.getHeader().setString(SenderCompID.FIELD, "TARGET"); + outboundFixMessage.getHeader().setString(TargetCompID.FIELD, "SENDER"); + Session mockSession = Mockito.spy(TestSupport.createSession(sessionID)); + Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage)); + Mockito.doAnswer(new Answer<Boolean>() { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + new Timer().schedule(new TimerTask() { + @Override + public void run() { + try { + quickfixjEngine.getMessageCorrelator().onEvent(QuickfixjEventCategory.AppMessageReceived, sessionID, outboundFixMessage); + } catch (Exception e) { + e.printStackTrace(); + } + } + }, 10); + return true; + } + }).when(mockSession).send(Mockito.isA(Message.class)); + producer.process(mockExchange); - Mockito.verify(mockExchange).setException(Matchers.isA(IllegalStateException.class)); + Mockito.verify(mockExchange, Mockito.never()).setException(Matchers.isA(IllegalStateException.class)); + Mockito.verify(mockSession).send(inboundFixMessage); + Mockito.verify(mockOutboundCamelMessage).setBody(outboundFixMessage); } } Modified: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java?rev=1130604&r1=1130603&r2=1130604&view=diff ============================================================================== --- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java (original) +++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java Thu Jun 2 15:26:12 2011 @@ -19,7 +19,23 @@ package org.apache.camel.component.quick import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.util.Date; +import javax.management.JMException; + +import org.mockito.Mockito; + +import quickfix.Acceptor; +import quickfix.Application; +import quickfix.ConfigError; +import quickfix.DefaultSessionFactory; +import quickfix.FieldConvertError; +import quickfix.LogFactory; +import quickfix.MessageFactory; +import quickfix.MessageStore; +import quickfix.MessageStoreFactory; +import quickfix.Session; +import quickfix.SessionFactory; import quickfix.SessionID; import quickfix.SessionSettings; import quickfix.field.EmailThreadID; @@ -55,4 +71,51 @@ public final class TestSupport { email.addGroup(text); return email; } + + public static Session createSession(SessionID sessionID) throws ConfigError, IOException { + MessageStoreFactory mockMessageStoreFactory = Mockito.mock(MessageStoreFactory.class); + MessageStore mockMessageStore = Mockito.mock(MessageStore.class); + Mockito.when(mockMessageStore.getCreationTime()).thenReturn(new Date()); + + Mockito.when(mockMessageStoreFactory.create(sessionID)).thenReturn(mockMessageStore); + + DefaultSessionFactory factory = new DefaultSessionFactory( + Mockito.mock(Application.class), + mockMessageStoreFactory, + Mockito.mock(LogFactory.class)); + + SessionSettings settings = new SessionSettings(); + settings.setLong(Session.SETTING_HEARTBTINT, 10); + settings.setString(Session.SETTING_START_TIME, "00:00:00"); + settings.setString(Session.SETTING_END_TIME, "00:00:00"); + settings.setString(SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.ACCEPTOR_CONNECTION_TYPE); + settings.setBool(Session.SETTING_USE_DATA_DICTIONARY, false); + + return factory.create(sessionID, settings); + + } + + public static QuickfixjEngine createEngine() throws ConfigError, FieldConvertError, IOException, JMException { + SessionID sessionID = new SessionID("FIX.4.4:SENDER->TARGET"); + + MessageStoreFactory mockMessageStoreFactory = Mockito.mock(MessageStoreFactory.class); + MessageStore mockMessageStore = Mockito.mock(MessageStore.class); + Mockito.when(mockMessageStore.getCreationTime()).thenReturn(new Date()); + Mockito.when(mockMessageStoreFactory.create(sessionID)).thenReturn(mockMessageStore); + + SessionSettings settings = new SessionSettings(); + + settings.setLong(sessionID, Session.SETTING_HEARTBTINT, 10); + settings.setString(sessionID, Session.SETTING_START_TIME, "00:00:00"); + settings.setString(sessionID, Session.SETTING_END_TIME, "00:00:00"); + settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.ACCEPTOR_CONNECTION_TYPE); + settings.setLong(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PORT, 8000); + settings.setBool(sessionID, Session.SETTING_USE_DATA_DICTIONARY, false); + + return new QuickfixjEngine("", settings, false, + mockMessageStoreFactory, + Mockito.mock(LogFactory.class), + Mockito.mock(MessageFactory.class)); + + } } Added: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/RequestReplyExample.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/RequestReplyExample.java?rev=1130604&view=auto ============================================================================== --- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/RequestReplyExample.java (added) +++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/RequestReplyExample.java Thu Jun 2 15:26:12 2011 @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.quickfixj.examples; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Body; +import org.apache.camel.Exchange; +import org.apache.camel.Header; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.quickfixj.MessagePredicate; +import org.apache.camel.component.quickfixj.QuickfixjComponent; +import org.apache.camel.component.quickfixj.QuickfixjEndpoint; +import org.apache.camel.component.quickfixj.QuickfixjEventCategory; +import org.apache.camel.component.quickfixj.QuickfixjProducer; +import org.apache.camel.component.quickfixj.examples.transform.QuickfixjMessageJsonTransformer; +import org.apache.camel.component.quickfixj.examples.util.CountDownLatchDecrementer; +import org.apache.camel.impl.DefaultCamelContext; +import org.eclipse.jetty.util.log.Log; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import quickfix.FieldNotFound; +import quickfix.SessionID; +import quickfix.field.AvgPx; +import quickfix.field.ClOrdID; +import quickfix.field.CumQty; +import quickfix.field.ExecID; +import quickfix.field.ExecTransType; +import quickfix.field.ExecType; +import quickfix.field.LeavesQty; +import quickfix.field.MsgType; +import quickfix.field.OrdStatus; +import quickfix.field.OrderID; +import quickfix.field.Side; +import quickfix.field.Symbol; +import quickfix.fix42.ExecutionReport; +import quickfix.fix42.OrderStatusRequest; + +public class RequestReplyExample { + private static final Logger LOG = LoggerFactory.getLogger(QuickfixjComponent.class); + + public static void main(String[] args) throws Exception { + new RequestReplyExample().run(); + } + + public void run() throws Exception { + DefaultCamelContext context = new DefaultCamelContext(); + final CountDownLatch logonLatch = new CountDownLatch(1); + final String orderStatusServiceUrl = "http://localhost:9123/order/status"; + + RouteBuilder routes = new RouteBuilder() { + @Override + public void configure() throws Exception { + // Synchronize the logon so we don't start sending status requests too early + from("quickfix:examples/inprocess.cfg?sessionID=FIX.4.2:TRADER->MARKET"). + filter(header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.SessionLogon)). + bean(new CountDownLatchDecrementer("logon", logonLatch)); + + // Incoming status requests are converted to InOut exchange pattern and passed to the + // order status service. The response is sent back to the session making the request. + from("quickfix:examples/inprocess.cfg?sessionID=FIX.4.2:MARKET->TRADER&exchangePattern=InOut"). + filter(header(QuickfixjEndpoint.MESSAGE_TYPE_KEY).isEqualTo(MsgType.ORDER_STATUS_REQUEST)). + bean(new MarketOrderStatusService()); + + from ("jetty:" + orderStatusServiceUrl). + bean(new OrderStatusRequestTransformer()). + routingSlip(bean(FixSessionRouter.class, "route")). + bean(new QuickfixjMessageJsonTransformer()); + } + }; + + context.addRoutes(routes); + + LOG.info("Starting Camel context"); + context.start(); + + if (!logonLatch.await(5L, TimeUnit.SECONDS)) { + throw new IllegalStateException("Logon did not succeed"); + } + + // Send a request to the order status web service. + // Verify that the response is a JSON response. + + URL orderStatusUrl = new URL(orderStatusServiceUrl + "?sessionID=FIX.4.2:TRADER->MARKET&orderID=abc"); + HttpURLConnection connection = (HttpURLConnection) orderStatusUrl.openConnection(); + BufferedReader orderStatusReply = new BufferedReader(new InputStreamReader(connection.getInputStream())); + String line = orderStatusReply.readLine(); + if (!line.equals("\"message\": {")) { + throw new Exception("Don't appear to be a JSON response"); + } + else { + StringBuilder sb = new StringBuilder(); + while (line != null) { + sb.append(line); + sb.append('\n'); + line = orderStatusReply.readLine(); + } + Log.info("Web request response:\n" + sb); + + } + orderStatusReply.close(); + + LOG.info("Shutting down Camel context"); + context.stop(); + + LOG.info("Example complete"); + } + + public static class OrderStatusRequestTransformer { + public void transform(Exchange exchange) throws FieldNotFound { + String sessionID = (String) exchange.getIn().getHeader("sessionID"); + String orderID = (String) exchange.getIn().getHeader("orderID"); + + OrderStatusRequest request = new OrderStatusRequest(new ClOrdID("XYZ"), new Symbol("GOOG"), new Side(Side.BUY)); + request.set(new OrderID(orderID)); + + // Look for a reply execution report back to the requester session + // and having the requested OrderID. This is a loose correlation but the best + // we can do with FIX 4.2. Newer versions of FIX have an optional explicit correlation field. + exchange.setProperty(QuickfixjProducer.CORRELATION_CRITERIA_KEY, + new MessagePredicate(new SessionID(sessionID), MsgType.EXECUTION_REPORT) + .withField(OrderID.FIELD, request.getString(OrderID.FIELD))); + + exchange.getIn().setBody(request); + } + } + + public static class MarketOrderStatusService { + private static final Logger LOG = LoggerFactory.getLogger(QuickfixjComponent.class); + + public ExecutionReport getOrderStatus(OrderStatusRequest request) throws FieldNotFound { + LOG.info("Received order status request for orderId=" + request.getOrderID().getValue()); + return new ExecutionReport( + request.getOrderID(), new ExecID(UUID.randomUUID().toString()), + new ExecTransType(ExecTransType.STATUS), + new ExecType(ExecType.REJECTED), + new OrdStatus(OrdStatus.REJECTED), + new Symbol("GOOG"), + new Side(Side.BUY), + new LeavesQty(100), + new CumQty(0), + new AvgPx(0)); + } + } + + public static class FixSessionRouter { + public String route(@Header("sessionID") String sessionID, @Body Object body) { + return String.format("quickfix:examples/inprocess.cfg?sessionID=%s", sessionID); + } + } +} Modified: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutorExample.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutorExample.java?rev=1130604&r1=1130603&r2=1130604&view=diff ============================================================================== --- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutorExample.java (original) +++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutorExample.java Thu Jun 2 15:26:12 2011 @@ -63,7 +63,6 @@ public class TradeExecutorExample { @Override public void configure() throws Exception { // Release latch when session logon events are received - // We expect four logon events (four sessions) from("quickfix:examples/inprocess.cfg"). filter(header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.SessionLogon)). bean(new CountDownLatchDecrementer("logon", logonLatch)); Modified: camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonTransformer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonTransformer.java?rev=1130604&r1=1130603&r2=1130604&view=diff ============================================================================== --- camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonTransformer.java (original) +++ camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonTransformer.java Thu Jun 2 15:26:12 2011 @@ -18,16 +18,32 @@ package org.apache.camel.component.quick import java.util.Iterator; +import quickfix.ConfigError; import quickfix.DataDictionary; import quickfix.Field; import quickfix.FieldMap; +import quickfix.FieldNotFound; import quickfix.FieldType; import quickfix.Group; import quickfix.Message; - +import quickfix.MessageUtils; +import quickfix.Session; +import quickfix.SessionID; public class QuickfixjMessageJsonTransformer { - + + public String transform(Message message) throws FieldNotFound, ConfigError { + SessionID sessionID = MessageUtils.getSessionID(message); + Session session = Session.lookupSession(sessionID); + DataDictionary dataDictionary = session.getDataDictionary(); + + if (dataDictionary == null) { + throw new IllegalStateException("No Data Dictionary. Exchange must reference an existing session"); + } + + return transform(message, dataDictionary); + } + public String transform(Message message, DataDictionary dataDictionary) { return transform(message, "", dataDictionary); } Modified: camel/trunk/components/camel-quickfix/src/test/resources/examples/inprocess.cfg URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/resources/examples/inprocess.cfg?rev=1130604&r1=1130603&r2=1130604&view=diff ============================================================================== --- camel/trunk/components/camel-quickfix/src/test/resources/examples/inprocess.cfg (original) +++ camel/trunk/components/camel-quickfix/src/test/resources/examples/inprocess.cfg Thu Jun 2 15:26:12 2011 @@ -23,6 +23,7 @@ UseJmx=Y SocketAcceptProtocol=VM_PIPE SocketConnectProtocol=VM_PIPE +HeartBtInt=120 # # Initiator for simulating a trader