Author: hadrian
Date: Thu Jun  2 15:26:12 2011
New Revision: 1130604

URL: http://svn.apache.org/viewvc?rev=1130604&view=rev
Log:
CAMEL-4038. Original contributed patch applied with thanks to Steve. Checkstyle 
fixes to follow

Added:
    
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessageCorrelator.java
    
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessagePredicate.java
    
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/RequestReplyExample.java
Modified:
    camel/trunk/components/camel-quickfix/pom.xml
    
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
    
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java
    
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
    
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
    
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
    
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java
    
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConvertersTest.java
    
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
    
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java
    
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutorExample.java
    
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonTransformer.java
    
camel/trunk/components/camel-quickfix/src/test/resources/examples/inprocess.cfg

Modified: camel/trunk/components/camel-quickfix/pom.xml
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/pom.xml?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- camel/trunk/components/camel-quickfix/pom.xml (original)
+++ camel/trunk/components/camel-quickfix/pom.xml Thu Jun  2 15:26:12 2011
@@ -67,12 +67,16 @@
             <artifactId>slf4j-api</artifactId>
         </dependency>
 
+        <dependency>
+                       <groupId>org.apache.camel</groupId>
+                       <artifactId>camel-jetty</artifactId>
+            <scope>test</scope>
+        </dependency>
                <dependency>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                        <scope>test</scope>
                </dependency>
-
            <dependency>
              <groupId>org.springframework</groupId>
              <artifactId>spring-test</artifactId>

Added: 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessageCorrelator.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessageCorrelator.java?rev=1130604&view=auto
==============================================================================
--- 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessageCorrelator.java
 (added)
+++ 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessageCorrelator.java
 Thu Jun  2 15:26:12 2011
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
+
+import quickfix.Message;
+import quickfix.SessionID;
+
+public class MessageCorrelator implements QuickfixjEventListener {
+       public static final long DEFAULT_CORRELATION_TIMEOUT = 1000L;
+       private final CopyOnWriteArrayList<MessageCorrelationRule> rules = new 
CopyOnWriteArrayList<MessageCorrelationRule>();
+
+       public Callable<Message> getReply(SessionID sessionID, Exchange 
exchange)
+                       throws InterruptedException, ExchangeTimedOutException {
+
+               MessagePredicate messageCriteria = (MessagePredicate) exchange
+                               
.getProperty(QuickfixjProducer.CORRELATION_CRITERIA_KEY);
+               final MessageCorrelationRule correlationRule = new 
MessageCorrelationRule(
+                               exchange, sessionID, messageCriteria);
+               
+               final long timeout = exchange.getProperty(
+                               QuickfixjProducer.CORRELATION_TIMEOUT_KEY,
+                               DEFAULT_CORRELATION_TIMEOUT, Long.class);
+
+               rules.add(correlationRule);
+
+               return new Callable<Message>() {
+                       @Override
+                       public Message call() throws Exception {
+                               if (!correlationRule.getLatch().await(timeout,
+                                               TimeUnit.MILLISECONDS)) {
+                                       throw new ExchangeTimedOutException(
+                                                       
correlationRule.getExchange(), timeout);
+                               }
+                               return correlationRule.getReplyMessage();
+                       }
+               };
+       }
+
+       @Override
+       public void onEvent(QuickfixjEventCategory eventCategory,
+                       SessionID sessionID, Message message) throws Exception {
+               if (message != null) {
+                       for (MessageCorrelationRule rule : rules) {
+                               if 
(rule.getMessageCriteria().evaluate(message)) {
+                                       rule.setReplyMessage(message);
+                                       rules.remove(rule);
+                                       rule.getLatch().countDown();
+                               }
+                       }
+               }
+       }
+
+       private class MessageCorrelationRule {
+               private final Exchange exchange;
+               private final CountDownLatch latch = new CountDownLatch(1);
+               private final MessagePredicate messageCriteria;
+
+               private Message replyMessage;
+
+               public MessageCorrelationRule(Exchange exchange, SessionID 
sessionID,
+                               MessagePredicate messageCriteria) {
+                       this.exchange = exchange;
+                       this.messageCriteria = messageCriteria;
+               }
+
+               public void setReplyMessage(Message message) {
+                       this.replyMessage = message;
+               }
+
+               public Message getReplyMessage() {
+                       return replyMessage;
+               }
+
+               public CountDownLatch getLatch() {
+                       return latch;
+               }
+
+               public Exchange getExchange() {
+                       return exchange;
+               }
+
+               public MessagePredicate getMessageCriteria() {
+                       return messageCriteria;
+               }
+       }
+
+}

Added: 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessagePredicate.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessagePredicate.java?rev=1130604&view=auto
==============================================================================
--- 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessagePredicate.java
 (added)
+++ 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessagePredicate.java
 Thu Jun  2 15:26:12 2011
@@ -0,0 +1,68 @@
+package org.apache.camel.component.quickfixj;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import quickfix.Field;
+import quickfix.FieldMap;
+import quickfix.FieldNotFound;
+import quickfix.Message;
+import quickfix.SessionID;
+import quickfix.field.MsgType;
+import quickfix.field.SenderCompID;
+import quickfix.field.TargetCompID;
+
+public class MessagePredicate {
+       private final List<Field<String>> headerCriteria = new 
ArrayList<Field<String>>();
+       private final List<Field<String>> bodyCriteria = new 
ArrayList<Field<String>>();
+       
+       public MessagePredicate(SessionID requestingSessionID, String msgType) {
+               // Reverse session ID for reply
+               // TODO may need to optionally include subID and locationID
+               addHeaderFieldIfPresent(SenderCompID.FIELD, 
requestingSessionID.getTargetCompID());
+               addHeaderFieldIfPresent(TargetCompID.FIELD, 
requestingSessionID.getSenderCompID());
+               withMessageType(msgType);
+       }
+       
+       private void addHeaderFieldIfPresent(int tag, String value) {
+               if (value != null && !"".equals(value)) {
+                       withHeaderField(tag, value);
+               }
+       }
+
+       public boolean evaluate(Message message) {
+               return evaluate(message, bodyCriteria) && 
evaluate(message.getHeader(), headerCriteria);
+       }
+       
+       private boolean evaluate(FieldMap fieldMap, List<Field<String>> 
criteria) {
+               for (Field<String> c : criteria) {
+                       String value = null;
+                       try {
+                               if (fieldMap.isSetField(c.getField())) {
+                                       value = 
fieldMap.getString(c.getField());
+                               }
+                       } catch (FieldNotFound e) {
+                               // ignored, shouldn't happen
+                       }
+                       if (!c.getObject().equals(value)) {
+                               return false;
+                       }
+               }
+               return true;
+       }
+
+       public MessagePredicate withField(int tag, String value) {
+               bodyCriteria.add(new Field<String>(tag, value));
+               return this;
+       }
+
+       public MessagePredicate withHeaderField(int tag, String value) {
+               headerCriteria.add(new Field<String>(tag, value));
+               return this;
+       }
+
+       private MessagePredicate withMessageType(String msgType) {
+               headerCriteria.add(new Field<String>(MsgType.FIELD, msgType));
+               return this;
+       }
+}

Modified: 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
 (original)
+++ 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java
 Thu Jun  2 15:26:12 2011
@@ -22,6 +22,7 @@ import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+
 import javax.management.JMException;
 
 import org.apache.camel.Endpoint;
@@ -29,6 +30,7 @@ import org.apache.camel.impl.DefaultComp
 import org.apache.camel.util.UnsafeUriCharactersEncoder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import quickfix.ConfigError;
 import quickfix.FieldConvertError;
 import quickfix.LogFactory;
@@ -72,7 +74,7 @@ public class QuickfixjComponent extends 
                     }
                 }
 
-                endpoint = new QuickfixjEndpoint(uri, getCamelContext());
+                endpoint = new QuickfixjEndpoint(engine, uri, 
getCamelContext());
                 engine.addEventListener(endpoint);
                 endpoints.put(uri, endpoint);
             }

Modified: 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java
 (original)
+++ 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java
 Thu Jun  2 15:26:12 2011
@@ -18,12 +18,17 @@ package org.apache.camel.component.quick
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 
+import quickfix.MessageUtils;
+import quickfix.Session;
+import quickfix.SessionID;
+
 public class QuickfixjConsumer extends DefaultConsumer {
 
-    public QuickfixjConsumer(Endpoint endpoint, Processor processor) {
+       public QuickfixjConsumer(Endpoint endpoint, Processor processor) {
         super(endpoint, processor);
     }
 
@@ -31,9 +36,38 @@ public class QuickfixjConsumer extends D
         if (isStarted()) {
             try {
                 getProcessor().process(exchange);
+                if (exchange.getPattern().isOutCapable() && exchange.hasOut()) 
{
+                       sendOutMessage(exchange);
+                }
             } catch (Exception e) {
                 exchange.setException(e);
             }
         }
     }
+
+       private void sendOutMessage(Exchange exchange) {
+               try {
+                       Message camelMessage = exchange.getOut();
+                       quickfix.Message quickfixjMessage = 
camelMessage.getBody(quickfix.Message.class);
+        
+                       if (log.isDebugEnabled()) {
+                           log.debug("Sending FIX message reply: " + 
quickfixjMessage.toString());
+                       }
+                       
+                       SessionID messageSessionID = 
MessageUtils.getReverseSessionID(exchange.getIn().getBody(quickfix.Message.class));
+                       
+                       Session session = getSession(messageSessionID);
+                       if (session == null) {
+                          throw new IllegalStateException("Unknown session: " 
+ messageSessionID);
+                       }
+                       
+                       session.send(quickfixjMessage);
+               } catch (Exception e) {
+                       exchange.setException(e);
+               }
+       }
+
+       Session getSession(SessionID messageSessionID) {
+               return Session.lookupSession(messageSessionID);
+       }
 }

Modified: 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
 Thu Jun  2 15:26:12 2011
@@ -25,73 +25,124 @@ import org.apache.camel.Exchange;
 import org.apache.camel.MultipleConsumersSupport;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.ResolveEndpointFailedException;
 import org.apache.camel.component.quickfixj.converter.QuickfixjConverters;
 import org.apache.camel.impl.DefaultEndpoint;
-import org.apache.camel.util.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import quickfix.Message;
 import quickfix.SessionID;
 
-public class QuickfixjEndpoint extends DefaultEndpoint implements 
QuickfixjEventListener, MultipleConsumersSupport {
-    public static final String EVENT_CATEGORY_KEY = "EventCategory";
-    public static final String SESSION_ID_KEY = "SessionID";
-    public static final String MESSAGE_TYPE_KEY = "MessageType";
-    public static final String DATA_DICTIONARY_KEY = "DataDictionary";
-    
-    private static final Logger LOG = 
LoggerFactory.getLogger(QuickfixjEndpoint.class);
-    
-    private SessionID sessionID;
-    private final List<QuickfixjConsumer> consumers = new 
CopyOnWriteArrayList<QuickfixjConsumer>();
-    
-    public QuickfixjEndpoint(String uri, CamelContext context) {
-        super(uri, context);
-    }
-
-    protected SessionID getSessionID() {
-        return sessionID;
-    }
-
-    public void setSessionID(SessionID sessionID) {
-        this.sessionID = sessionID;
-    }
-    
-    public Consumer createConsumer(Processor processor) throws Exception {
-        LOG.info("Creating QuickFIX/J consumer: " + (sessionID != null ? 
sessionID : "No Session"));
-        QuickfixjConsumer consumer = new QuickfixjConsumer(this, processor);
-        consumers.add(consumer);
-        return consumer;
-    }
-
-    public Producer createProducer() throws Exception {
-        LOG.info("Creating QuickFIX/J producer: " + (sessionID != null ? 
sessionID : "No Session"));
-        return new QuickfixjProducer(this);
-    }
-
-    public boolean isSingleton() {
-        return true;
-    }
-
-    public void onEvent(QuickfixjEventCategory eventCategory, SessionID 
sessionID, Message message) throws Exception {
-        if (this.sessionID == null || this.sessionID.equals(sessionID)) {
-            for (QuickfixjConsumer consumer : consumers) {
-                Exchange exchange = QuickfixjConverters.toExchange(this, 
sessionID, message, eventCategory);
-                consumer.onExchange(exchange);
-                if (exchange.getException() != null) {
-                    throw exchange.getException();
-                }
-            }
-        }
-    }
-    
-    public boolean isMultipleConsumersSupported() {
-        return true;
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        // clear list of consumers
-        consumers.clear();
-    }
+public class QuickfixjEndpoint extends DefaultEndpoint implements
+               QuickfixjEventListener, MultipleConsumersSupport {
+       public static final String EVENT_CATEGORY_KEY = "EventCategory";
+       public static final String SESSION_ID_KEY = "SessionID";
+       public static final String MESSAGE_TYPE_KEY = "MessageType";
+       public static final String DATA_DICTIONARY_KEY = "DataDictionary";
+
+       private static final Logger LOG = LoggerFactory
+                       .getLogger(QuickfixjEndpoint.class);
+
+       private SessionID sessionID;
+       private final List<QuickfixjConsumer> consumers = new 
CopyOnWriteArrayList<QuickfixjConsumer>();
+       private final QuickfixjEngine engine;
+
+       public QuickfixjEndpoint(QuickfixjEngine engine, String uri, 
CamelContext context) {
+               super(uri, context);
+               this.engine = engine;
+       }
+
+       public SessionID getSessionID() {
+               return sessionID;
+       }
+
+       public void setSessionID(SessionID sessionID) {
+               this.sessionID = sessionID;
+       }
+
+       public Consumer createConsumer(Processor processor) throws Exception {
+               LOG.info("Creating QuickFIX/J consumer: "
+                               + (sessionID != null ? sessionID : "No Session")
+                               + ", ExchangePattern=" + getExchangePattern());
+               QuickfixjConsumer consumer = new QuickfixjConsumer(this, 
processor);
+               consumers.add(consumer);
+               return consumer;
+       }
+
+       public Producer createProducer() throws Exception {
+               LOG.info("Creating QuickFIX/J producer: "
+                               + (sessionID != null ? sessionID : "No 
Session"));
+               if (isWildcarded()) {
+                       throw new ResolveEndpointFailedException(
+                                       "Cannot create consumer on wildcarded 
session identifier: "
+                                                       + sessionID);
+               }
+               return new QuickfixjProducer(this);
+       }
+
+       public boolean isSingleton() {
+               return true;
+       }
+
+       public void onEvent(QuickfixjEventCategory eventCategory,
+                       SessionID sessionID, Message message) throws Exception {
+               if (this.sessionID == null || isMatching(sessionID)) {
+                       for (QuickfixjConsumer consumer : consumers) {
+                               Exchange exchange = 
QuickfixjConverters.toExchange(this,
+                                               sessionID, message, 
eventCategory);
+                               consumer.onExchange(exchange);
+                               if (exchange.getException() != null) {
+                                       throw exchange.getException();
+                               }
+                       }
+               }
+       }
+
+       private boolean isMatching(SessionID sessionID) {
+               return this.sessionID.equals(sessionID)
+                               || (isMatching(this.sessionID.getBeginString(),
+                                               sessionID.getBeginString())
+                                               && 
isMatching(this.sessionID.getSenderCompID(),
+                                                               
sessionID.getSenderCompID())
+                                               && 
isMatching(this.sessionID.getSenderSubID(),
+                                                               
sessionID.getSenderSubID())
+                                               && 
isMatching(this.sessionID.getSenderLocationID(),
+                                                               
sessionID.getSenderLocationID())
+                                               && 
isMatching(this.sessionID.getTargetCompID(),
+                                                               
sessionID.getTargetCompID())
+                                               && 
isMatching(this.sessionID.getTargetSubID(),
+                                                               
sessionID.getTargetSubID()) && isMatching(
+                                               
this.sessionID.getTargetLocationID(),
+                                               
sessionID.getTargetLocationID()));
+       }
+
+       private boolean isMatching(String s1, String s2) {
+               return s1.equals("") || s1.equals("*") || s1.equals(s2);
+       }
+
+       private boolean isWildcarded() {
+               return sessionID != null
+                               && (sessionID.getBeginString().equals("*")
+                                               || 
sessionID.getSenderCompID().equals("*")
+                                               || 
sessionID.getSenderSubID().equals("*")
+                                               || 
sessionID.getSenderLocationID().equals("*")
+                                               || 
sessionID.getTargetCompID().equals("*")
+                                               || 
sessionID.getTargetSubID().equals("*") || sessionID
+                                               
.getTargetLocationID().equals("*"));
+       }
+
+       public boolean isMultipleConsumersSupported() {
+               return true;
+       }
+
+       public QuickfixjEngine getEngine() {
+               return engine;
+       }
+       
+       @Override
+       protected void doStop() throws Exception {
+               // clear list of consumers
+               consumers.clear();
+       }
 }

Modified: 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
 (original)
+++ 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
 Thu Jun  2 15:26:12 2011
@@ -92,7 +92,8 @@ public class QuickfixjEngine {
     private final MessageStoreFactory messageStoreFactory;
     private final LogFactory sessionLogFactory;
     private final MessageFactory messageFactory;
-
+    private final MessageCorrelator messageCorrelator = new 
MessageCorrelator();
+    
     private boolean started;
     private List<QuickfixjEventListener> eventListeners = new 
CopyOnWriteArrayList<QuickfixjEventListener>();
 
@@ -119,6 +120,8 @@ public class QuickfixjEngine {
             MessageStoreFactory messageStoreFactoryOverride, LogFactory 
sessionLogFactoryOverride,
             MessageFactory messageFactoryOverride) throws ConfigError, 
FieldConvertError, IOException, JMException {
 
+       addEventListener(messageCorrelator);
+       
         this.uri = uri;
         this.forcedShutdown = forcedShutdown;
         
@@ -463,6 +466,10 @@ public class QuickfixjEngine {
         return uri;
     }
 
+    public MessageCorrelator getMessageCorrelator() {
+               return messageCorrelator;
+       }
+
     // For Testing
     Initiator getInitiator() {
         return initiator;

Modified: 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
 (original)
+++ 
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
 Thu Jun  2 15:26:12 2011
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.quickfixj;
 
+import java.util.concurrent.Callable;
+
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultProducer;
@@ -26,31 +28,61 @@ import quickfix.Session;
 import quickfix.SessionID;
 
 public class QuickfixjProducer extends DefaultProducer {
-    private final SessionID sessionID;
-
+    public static final String CORRELATION_TIMEOUT_KEY = "CorrelationTimeout";
+    public static final String CORRELATION_CRITERIA_KEY = 
"CorrelationCriteria";
+       
+       private final SessionID sessionID;
+    
     public QuickfixjProducer(Endpoint endpoint) {
         super(endpoint);
         sessionID = ((QuickfixjEndpoint) getEndpoint()).getSessionID();
     }
 
     public void process(Exchange exchange) throws Exception {
-        Message message = exchange.getIn().getBody(Message.class);
-        if (log.isDebugEnabled()) {
-            log.debug("Sending FIX message: " + message.toString());
-        }
-        
-        SessionID messageSessionID = sessionID;
-        if (messageSessionID == null) {
-            messageSessionID = MessageUtils.getSessionID(message);
-        }
-        
-        Session session = Session.lookupSession(messageSessionID);
-        if (session == null) {
-            exchange.setException(new IllegalStateException("Unknown session: 
" + messageSessionID));
-            return;
-        }
-        
-        session.send(message);
+        sendMessage(exchange, exchange.getIn());
     }
 
+       void sendMessage(Exchange exchange, org.apache.camel.Message 
camelMessage)
+                       throws InterruptedException {
+               try {
+                       Message message = camelMessage.getBody(Message.class);
+                       if (log.isDebugEnabled()) {
+                               log.debug("Sending FIX message: " + 
message.toString());
+                       }
+
+                       SessionID messageSessionID = sessionID;
+                       if (messageSessionID == null) {
+                               messageSessionID = 
MessageUtils.getSessionID(message);
+                       }
+
+                       Session session = getSession(messageSessionID);
+                       if (session == null) {
+                               throw new IllegalStateException("Unknown 
session: "
+                                               + messageSessionID);
+                       }
+
+                       Callable<Message> callable = null;
+
+                       if (exchange.getPattern().isOutCapable()) {
+                               QuickfixjEndpoint endpoint = 
(QuickfixjEndpoint) getEndpoint();
+                               MessageCorrelator messageCorrelator = 
endpoint.getEngine()
+                                               .getMessageCorrelator();
+                               callable = messageCorrelator.getReply(
+                                               endpoint.getSessionID(), 
exchange);
+                       }
+
+                       session.send(message);
+
+                       if (callable != null) {
+                               Message reply = callable.call();
+                               exchange.getOut().setBody(reply);
+                       }
+               } catch (Exception e) {
+                       exchange.setException(e);
+               }
+       }
+
+       Session getSession(SessionID messageSessionID) {
+               return Session.lookupSession(messageSessionID);
+       }
 }

Modified: 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java
 (original)
+++ 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java
 Thu Jun  2 15:26:12 2011
@@ -28,21 +28,41 @@ import org.junit.Test;
 import org.mockito.Matchers;
 import org.mockito.Mockito;
 
+import quickfix.FixVersions;
+import quickfix.Message;
+import quickfix.MessageUtils;
+import quickfix.Session;
+import quickfix.SessionID;
+import quickfix.field.BeginString;
+import quickfix.field.SenderCompID;
+import quickfix.field.TargetCompID;
+
 public class QuickfixjConsumerTest {
     private Exchange mockExchange;
     private Processor mockProcessor;
     private Endpoint mockEndpoint;
-
+       private SessionID sessionID;
+       private Message inboundFixMessage;
+    
     @Before
     public void setUp() {
+
         mockExchange = Mockito.mock(Exchange.class);
         org.apache.camel.Message mockCamelMessage = 
Mockito.mock(org.apache.camel.Message.class);
         Mockito.when(mockExchange.getIn()).thenReturn(mockCamelMessage);
         
+        inboundFixMessage = new Message();
+        inboundFixMessage.getHeader().setString(BeginString.FIELD, 
FixVersions.BEGINSTRING_FIX44);
+        inboundFixMessage.getHeader().setString(SenderCompID.FIELD, "SENDER");
+        inboundFixMessage.getHeader().setString(TargetCompID.FIELD, "TARGET");
+        sessionID = MessageUtils.getSessionID(inboundFixMessage);
+
+               
Mockito.when(mockCamelMessage.getBody(quickfix.Message.class)).thenReturn(inboundFixMessage);
+        
         mockProcessor = Mockito.mock(Processor.class);
         
         mockEndpoint = Mockito.mock(Endpoint.class);
-        
Mockito.when(mockEndpoint.createExchange(ExchangePattern.InOnly)).thenReturn(mockExchange);
        
+        
Mockito.when(mockEndpoint.createExchange(ExchangePattern.InOnly)).thenReturn(mockExchange);
  
     }
     
     @Test
@@ -67,8 +87,7 @@ public class QuickfixjConsumerTest {
     }
     
     @Test
-    public void setExceptionOnExchange() throws Exception {
-              
+    public void setExceptionOnExchange() throws Exception {            
         QuickfixjConsumer consumer = new QuickfixjConsumer(mockEndpoint, 
mockProcessor);
         consumer.start();
         
@@ -79,5 +98,45 @@ public class QuickfixjConsumerTest {
         consumer.onExchange(mockExchange);
         
         Mockito.verify(mockExchange).setException(exception);
-    }    
+    }
+    
+    @Test
+    public void setExceptionOnInOutExchange() throws Exception {            
+        org.apache.camel.Message mockCamelOutMessage = 
Mockito.mock(org.apache.camel.Message.class);
+        
Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOut);
+        Mockito.when(mockExchange.hasOut()).thenReturn(true);
+        Mockito.when(mockExchange.getOut()).thenReturn(mockCamelOutMessage);
+        
Mockito.when(mockCamelOutMessage.getBody(Message.class)).thenReturn(new 
Message());
+        
+        QuickfixjConsumer consumer = new QuickfixjConsumer(mockEndpoint, 
mockProcessor);
+        consumer.start();
+        
+        // Simulate a message from the FIX engine
+        consumer.onExchange(mockExchange);
+        
+        
Mockito.verify(mockExchange).setException(Mockito.isA(IllegalStateException.class));
+    }
+
+    @Test
+    public void processInOutExchange() throws Exception {
+        org.apache.camel.Message mockCamelOutMessage = 
Mockito.mock(org.apache.camel.Message.class);
+        Mockito.when(mockExchange.hasOut()).thenReturn(true);
+        Mockito.when(mockExchange.getOut()).thenReturn(mockCamelOutMessage);
+        Message outboundFixMessage = new Message();
+               
Mockito.when(mockCamelOutMessage.getBody(Message.class)).thenReturn(outboundFixMessage);
+        
+        QuickfixjConsumer consumer = Mockito.spy(new 
QuickfixjConsumer(mockEndpoint, mockProcessor));
+        Session mockSession = 
Mockito.spy(TestSupport.createSession(sessionID));
+               
Mockito.doReturn(mockSession).when(consumer).getSession(MessageUtils.getReverseSessionID(inboundFixMessage));
+               
Mockito.doReturn(true).when(mockSession).send(Mockito.isA(Message.class));
+        
+        consumer.start();
+        
+        
Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOut);
+        
+        consumer.onExchange(mockExchange);
+        
+        Mockito.verify(mockExchange, 
Mockito.never()).setException(Mockito.isA(Exception.class));
+        Mockito.verify(mockSession).send(outboundFixMessage);
+    }
 }

Modified: 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConvertersTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConvertersTest.java?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConvertersTest.java
 (original)
+++ 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConvertersTest.java
 Thu Jun  2 15:26:12 2011
@@ -98,7 +98,7 @@ public class QuickfixjConvertersTest {
     @Test
     public void convertToExchange() {
         SessionID sessionID = new SessionID("FIX.4.0", "FOO", "BAR");
-        QuickfixjEndpoint endpoint = new QuickfixjEndpoint("", camelContext);
+        QuickfixjEndpoint endpoint = new QuickfixjEndpoint(null, "", 
camelContext);
         
         Message message = new Message();     
         message.getHeader().setString(MsgType.FIELD, MsgType.ORDER_SINGLE);
@@ -116,7 +116,7 @@ public class QuickfixjConvertersTest {
     @Test
     public void convertToExchangeWithNullMessage() {
         SessionID sessionID = new SessionID("FIX.4.0", "FOO", "BAR");
-        QuickfixjEndpoint endpoint = new QuickfixjEndpoint("", camelContext);
+        QuickfixjEndpoint endpoint = new QuickfixjEndpoint(null, "", 
camelContext);
         
         Exchange exchange = QuickfixjConverters.toExchange(endpoint, 
sessionID, null, QuickfixjEventCategory.AppMessageSent);
         

Modified: 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
 (original)
+++ 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
 Thu Jun  2 15:26:12 2011
@@ -16,33 +16,133 @@
  */
 package org.apache.camel.component.quickfixj;
 
+import java.io.IOException;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import javax.management.JMException;
+
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Matchers;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
+import quickfix.ConfigError;
+import quickfix.FieldConvertError;
 import quickfix.FixVersions;
 import quickfix.Message;
+import quickfix.MessageUtils;
+import quickfix.Session;
 import quickfix.SessionID;
+import quickfix.field.BeginString;
+import quickfix.field.MsgType;
+import quickfix.field.SenderCompID;
+import quickfix.field.TargetCompID;
+import quickfix.fix42.Email;
 
 public class QuickfixjProducerTest {
-
+       private Exchange mockExchange;
+       private QuickfixjEndpoint mockEndpoint;
+       private org.apache.camel.Message mockCamelMessage;
+       private QuickfixjProducer producer;
+       private SessionID sessionID;
+       private Message inboundFixMessage;
+       private QuickfixjEngine quickfixjEngine;
+
+       @Before
+       public void setUp() throws ConfigError, FieldConvertError, IOException, 
JMException {
+        mockExchange = Mockito.mock(Exchange.class);
+        mockEndpoint = Mockito.mock(QuickfixjEndpoint.class);
+        mockCamelMessage = Mockito.mock(org.apache.camel.Message.class);
+        Mockito.when(mockExchange.getIn()).thenReturn(mockCamelMessage);
+        
Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOnly);
+        
+        quickfixjEngine = TestSupport.createEngine();
+        Mockito.when(mockEndpoint.getEngine()).thenReturn(quickfixjEngine);
+        
+        inboundFixMessage = new Message();
+        inboundFixMessage.getHeader().setString(BeginString.FIELD, 
FixVersions.BEGINSTRING_FIX44);
+        inboundFixMessage.getHeader().setString(SenderCompID.FIELD, "SENDER");
+        inboundFixMessage.getHeader().setString(TargetCompID.FIELD, "TARGET");
+        sessionID = MessageUtils.getSessionID(inboundFixMessage);
+   
+        
Mockito.when(mockCamelMessage.getBody(Message.class)).thenReturn(inboundFixMessage);
+
+        Mockito.when(mockEndpoint.getSessionID()).thenReturn(sessionID);     
+
+        producer = Mockito.spy(new QuickfixjProducer(mockEndpoint));
+       }
+       
+       @SuppressWarnings("serial")
+       public class TestException extends RuntimeException {
+               
+       }
+       
     @Test
     public void setExceptionOnExchange() throws Exception {
-        Exchange mockExchange = Mockito.mock(Exchange.class);
+        Session mockSession = 
Mockito.spy(TestSupport.createSession(sessionID));
+               
Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage));
+               Mockito.doThrow(new 
TestException()).when(mockSession).send(Mockito.isA(Message.class));
 
-        QuickfixjEndpoint mockEndpoint = Mockito.mock(QuickfixjEndpoint.class);
-        org.apache.camel.Message mockCamelMessage = 
Mockito.mock(org.apache.camel.Message.class);
-        Mockito.when(mockExchange.getIn()).thenReturn(mockCamelMessage);
-        Mockito.when(mockCamelMessage.getBody(Message.class)).thenReturn(new 
Message());
+               producer.process(mockExchange);    
+        
Mockito.verify(mockExchange).setException(Matchers.isA(TestException.class));
+    }
+    
+    @Test
+    public void processInOnlyExchange() throws Exception {        
+        Session mockSession = 
Mockito.spy(TestSupport.createSession(sessionID));
+               
Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage));
+               
Mockito.doReturn(true).when(mockSession).send(Mockito.isA(Message.class));
+        
+        producer.process(mockExchange);
+        
+        Mockito.verify(mockExchange, 
Mockito.never()).setException(Matchers.isA(IllegalStateException.class));
+        Mockito.verify(mockSession).send(inboundFixMessage);
+    }
 
-        SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, 
"SENDER", "TARGET");       
-        Mockito.when(mockEndpoint.getSessionID()).thenReturn(sessionID);
+    @Test
+    public void processInOutExchange() throws Exception {
+        
Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOut);
+        
Mockito.when(mockExchange.getProperty(QuickfixjProducer.CORRELATION_CRITERIA_KEY)).
+               thenReturn(new MessagePredicate(sessionID, MsgType.EMAIL));
+        Mockito.when(mockExchange.getProperty(
+                               QuickfixjProducer.CORRELATION_TIMEOUT_KEY,
+                               1000L, Long.class)).thenReturn(5000L);
+                               
+        org.apache.camel.Message mockOutboundCamelMessage = 
Mockito.mock(org.apache.camel.Message.class);
+               
Mockito.when(mockExchange.getOut()).thenReturn(mockOutboundCamelMessage);
         
-        QuickfixjProducer producer = new QuickfixjProducer(mockEndpoint);
+        final Message outboundFixMessage = new Email();
+        outboundFixMessage.getHeader().setString(SenderCompID.FIELD, "TARGET");
+        outboundFixMessage.getHeader().setString(TargetCompID.FIELD, "SENDER");
         
+        Session mockSession = 
Mockito.spy(TestSupport.createSession(sessionID));
+               
Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage));
+               Mockito.doAnswer(new Answer<Boolean>() {
+                       @Override
+                       public Boolean answer(InvocationOnMock invocation) 
throws Throwable {
+                               new Timer().schedule(new TimerTask() {          
                
+                                       @Override
+                                       public void run() {
+                                               try {
+                                                       
quickfixjEngine.getMessageCorrelator().onEvent(QuickfixjEventCategory.AppMessageReceived,
 sessionID, outboundFixMessage);
+                                               } catch (Exception e) {
+                                                       e.printStackTrace();
+                                               }
+                                       }
+                               }, 10);
+                               return true;
+                       }                       
+               }).when(mockSession).send(Mockito.isA(Message.class));
+
         producer.process(mockExchange);
         
-        
Mockito.verify(mockExchange).setException(Matchers.isA(IllegalStateException.class));
+        Mockito.verify(mockExchange, 
Mockito.never()).setException(Matchers.isA(IllegalStateException.class));
+        Mockito.verify(mockSession).send(inboundFixMessage);
+        Mockito.verify(mockOutboundCamelMessage).setBody(outboundFixMessage);
     }
 }

Modified: 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java
 (original)
+++ 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java
 Thu Jun  2 15:26:12 2011
@@ -19,7 +19,23 @@ package org.apache.camel.component.quick
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.Date;
 
+import javax.management.JMException;
+
+import org.mockito.Mockito;
+
+import quickfix.Acceptor;
+import quickfix.Application;
+import quickfix.ConfigError;
+import quickfix.DefaultSessionFactory;
+import quickfix.FieldConvertError;
+import quickfix.LogFactory;
+import quickfix.MessageFactory;
+import quickfix.MessageStore;
+import quickfix.MessageStoreFactory;
+import quickfix.Session;
+import quickfix.SessionFactory;
 import quickfix.SessionID;
 import quickfix.SessionSettings;
 import quickfix.field.EmailThreadID;
@@ -55,4 +71,51 @@ public final class TestSupport {
         email.addGroup(text);
         return email;
     }
+    
+    public static Session createSession(SessionID sessionID) throws 
ConfigError, IOException {
+       MessageStoreFactory mockMessageStoreFactory = 
Mockito.mock(MessageStoreFactory.class);
+       MessageStore mockMessageStore = Mockito.mock(MessageStore.class);
+       Mockito.when(mockMessageStore.getCreationTime()).thenReturn(new Date());
+       
+               
Mockito.when(mockMessageStoreFactory.create(sessionID)).thenReturn(mockMessageStore);
+       
+               DefaultSessionFactory factory = new DefaultSessionFactory(
+                       Mockito.mock(Application.class),
+                       mockMessageStoreFactory,
+                       Mockito.mock(LogFactory.class));
+       
+       SessionSettings settings = new SessionSettings();
+       settings.setLong(Session.SETTING_HEARTBTINT, 10);
+       settings.setString(Session.SETTING_START_TIME, "00:00:00");
+       settings.setString(Session.SETTING_END_TIME, "00:00:00");
+       settings.setString(SessionFactory.SETTING_CONNECTION_TYPE, 
SessionFactory.ACCEPTOR_CONNECTION_TYPE);
+       settings.setBool(Session.SETTING_USE_DATA_DICTIONARY, false);
+       
+               return factory.create(sessionID, settings);
+                       
+    }
+    
+    public static QuickfixjEngine createEngine() throws ConfigError, 
FieldConvertError, IOException, JMException {     
+       SessionID sessionID = new SessionID("FIX.4.4:SENDER->TARGET");
+
+       MessageStoreFactory mockMessageStoreFactory = 
Mockito.mock(MessageStoreFactory.class);
+       MessageStore mockMessageStore = Mockito.mock(MessageStore.class);
+       Mockito.when(mockMessageStore.getCreationTime()).thenReturn(new Date());
+               
Mockito.when(mockMessageStoreFactory.create(sessionID)).thenReturn(mockMessageStore);
+       
+       SessionSettings settings = new SessionSettings();
+       
+       settings.setLong(sessionID, Session.SETTING_HEARTBTINT, 10);
+       settings.setString(sessionID, Session.SETTING_START_TIME, "00:00:00");
+       settings.setString(sessionID, Session.SETTING_END_TIME, "00:00:00");
+       settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, 
SessionFactory.ACCEPTOR_CONNECTION_TYPE);
+       settings.setLong(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PORT, 8000);
+       settings.setBool(sessionID, Session.SETTING_USE_DATA_DICTIONARY, false);
+       
+               return new QuickfixjEngine("", settings, false, 
+                               mockMessageStoreFactory, 
+                               Mockito.mock(LogFactory.class), 
+                               Mockito.mock(MessageFactory.class));
+                       
+    }
 }

Added: 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/RequestReplyExample.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/RequestReplyExample.java?rev=1130604&view=auto
==============================================================================
--- 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/RequestReplyExample.java
 (added)
+++ 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/RequestReplyExample.java
 Thu Jun  2 15:26:12 2011
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.quickfixj.examples;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Body;
+import org.apache.camel.Exchange;
+import org.apache.camel.Header;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.quickfixj.MessagePredicate;
+import org.apache.camel.component.quickfixj.QuickfixjComponent;
+import org.apache.camel.component.quickfixj.QuickfixjEndpoint;
+import org.apache.camel.component.quickfixj.QuickfixjEventCategory;
+import org.apache.camel.component.quickfixj.QuickfixjProducer;
+import 
org.apache.camel.component.quickfixj.examples.transform.QuickfixjMessageJsonTransformer;
+import 
org.apache.camel.component.quickfixj.examples.util.CountDownLatchDecrementer;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.eclipse.jetty.util.log.Log;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import quickfix.FieldNotFound;
+import quickfix.SessionID;
+import quickfix.field.AvgPx;
+import quickfix.field.ClOrdID;
+import quickfix.field.CumQty;
+import quickfix.field.ExecID;
+import quickfix.field.ExecTransType;
+import quickfix.field.ExecType;
+import quickfix.field.LeavesQty;
+import quickfix.field.MsgType;
+import quickfix.field.OrdStatus;
+import quickfix.field.OrderID;
+import quickfix.field.Side;
+import quickfix.field.Symbol;
+import quickfix.fix42.ExecutionReport;
+import quickfix.fix42.OrderStatusRequest;
+
+public class RequestReplyExample {
+    private static final Logger LOG = 
LoggerFactory.getLogger(QuickfixjComponent.class);
+
+    public static void main(String[] args) throws Exception {
+        new RequestReplyExample().run();
+    }
+    
+    public void run() throws Exception {        
+        DefaultCamelContext context = new DefaultCamelContext();
+        final CountDownLatch logonLatch = new CountDownLatch(1);
+        final String orderStatusServiceUrl = 
"http://localhost:9123/order/status";;
+        
+        RouteBuilder routes = new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+               // Synchronize the logon so we don't start sending status 
requests too early
+                
from("quickfix:examples/inprocess.cfg?sessionID=FIX.4.2:TRADER->MARKET").
+                    
filter(header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.SessionLogon)).
+                    bean(new CountDownLatchDecrementer("logon", logonLatch));
+
+                // Incoming status requests are converted to InOut exchange 
pattern and passed to the
+                // order status service. The response is sent back to the 
session making the request.
+                
from("quickfix:examples/inprocess.cfg?sessionID=FIX.4.2:MARKET->TRADER&exchangePattern=InOut").
+                       
filter(header(QuickfixjEndpoint.MESSAGE_TYPE_KEY).isEqualTo(MsgType.ORDER_STATUS_REQUEST)).
+                    bean(new MarketOrderStatusService());
+                
+                from ("jetty:" + orderStatusServiceUrl).
+                       bean(new OrderStatusRequestTransformer()).
+                       routingSlip(bean(FixSessionRouter.class, "route")).
+                       bean(new QuickfixjMessageJsonTransformer());
+            }
+        };
+        
+        context.addRoutes(routes);
+        
+        LOG.info("Starting Camel context");
+        context.start();
+        
+        if (!logonLatch.await(5L, TimeUnit.SECONDS)) {
+            throw new IllegalStateException("Logon did not succeed");
+        }
+        
+        // Send a request to the order status web service.
+        // Verify that the response is a JSON response.
+        
+        URL orderStatusUrl = new URL(orderStatusServiceUrl + 
"?sessionID=FIX.4.2:TRADER->MARKET&orderID=abc");
+               HttpURLConnection connection = (HttpURLConnection) 
orderStatusUrl.openConnection();
+        BufferedReader orderStatusReply = new BufferedReader(new 
InputStreamReader(connection.getInputStream()));
+        String line = orderStatusReply.readLine();
+        if (!line.equals("\"message\": {")) {
+               throw new Exception("Don't appear to be a JSON response");
+        }
+        else {
+               StringBuilder sb = new StringBuilder();
+               while (line != null) {
+                       sb.append(line);
+                       sb.append('\n');
+                       line = orderStatusReply.readLine();
+               }
+               Log.info("Web request response:\n" + sb);
+               
+        }
+        orderStatusReply.close();
+        
+        LOG.info("Shutting down Camel context");
+        context.stop();
+        
+        LOG.info("Example complete");
+    }
+        
+    public static class OrderStatusRequestTransformer {
+       public void transform(Exchange exchange) throws FieldNotFound {
+               String sessionID = (String) 
exchange.getIn().getHeader("sessionID");
+               String orderID = (String) exchange.getIn().getHeader("orderID");
+               
+            OrderStatusRequest request = new OrderStatusRequest(new 
ClOrdID("XYZ"), new Symbol("GOOG"), new Side(Side.BUY));
+            request.set(new OrderID(orderID));
+             
+            // Look for a reply execution report back to the requester session
+            // and having the requested OrderID. This is a loose correlation 
but the best
+            // we can do with FIX 4.2. Newer versions of FIX have an optional 
explicit correlation field.
+            exchange.setProperty(QuickfixjProducer.CORRELATION_CRITERIA_KEY,
+                       new MessagePredicate(new SessionID(sessionID), 
MsgType.EXECUTION_REPORT)
+                               .withField(OrderID.FIELD, 
request.getString(OrderID.FIELD)));
+            
+            exchange.getIn().setBody(request);
+       }
+    }
+    
+    public static class MarketOrderStatusService {
+        private static final Logger LOG = 
LoggerFactory.getLogger(QuickfixjComponent.class);
+        
+        public ExecutionReport getOrderStatus(OrderStatusRequest request) 
throws FieldNotFound {
+               LOG.info("Received order status request for orderId=" + 
request.getOrderID().getValue());
+               return new ExecutionReport(
+                               request.getOrderID(), new 
ExecID(UUID.randomUUID().toString()),
+                                               new 
ExecTransType(ExecTransType.STATUS), 
+                                               new ExecType(ExecType.REJECTED),
+                                               new 
OrdStatus(OrdStatus.REJECTED),
+                                               new Symbol("GOOG"),
+                                               new Side(Side.BUY),
+                                               new LeavesQty(100),
+                                               new CumQty(0),
+                                               new AvgPx(0));
+       }
+    }
+    
+    public static class FixSessionRouter {
+       public String route(@Header("sessionID") String sessionID, @Body Object 
body) {
+               return 
String.format("quickfix:examples/inprocess.cfg?sessionID=%s", sessionID);
+       }
+    }
+}

Modified: 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutorExample.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutorExample.java?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutorExample.java
 (original)
+++ 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutorExample.java
 Thu Jun  2 15:26:12 2011
@@ -63,7 +63,6 @@ public class TradeExecutorExample {
             @Override
             public void configure() throws Exception {
                 // Release latch when session logon events are received
-                // We expect four logon events (four sessions)
                 from("quickfix:examples/inprocess.cfg").
                     
filter(header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.SessionLogon)).
                     bean(new CountDownLatchDecrementer("logon", logonLatch));

Modified: 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonTransformer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonTransformer.java?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonTransformer.java
 (original)
+++ 
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonTransformer.java
 Thu Jun  2 15:26:12 2011
@@ -18,16 +18,32 @@ package org.apache.camel.component.quick
 
 import java.util.Iterator;
 
+import quickfix.ConfigError;
 import quickfix.DataDictionary;
 import quickfix.Field;
 import quickfix.FieldMap;
+import quickfix.FieldNotFound;
 import quickfix.FieldType;
 import quickfix.Group;
 import quickfix.Message;
-
+import quickfix.MessageUtils;
+import quickfix.Session;
+import quickfix.SessionID;
 
 public class QuickfixjMessageJsonTransformer {
-    
+   
+       public String transform(Message message) throws FieldNotFound, 
ConfigError {
+               SessionID sessionID = MessageUtils.getSessionID(message);
+        Session session = Session.lookupSession(sessionID);
+        DataDictionary dataDictionary = session.getDataDictionary();
+        
+        if (dataDictionary == null) {
+            throw new IllegalStateException("No Data Dictionary. Exchange must 
reference an existing session");
+        }
+        
+               return transform(message, dataDictionary);
+       }
+       
     public String transform(Message message, DataDictionary dataDictionary) {
         return transform(message, "", dataDictionary);
     }

Modified: 
camel/trunk/components/camel-quickfix/src/test/resources/examples/inprocess.cfg
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/resources/examples/inprocess.cfg?rev=1130604&r1=1130603&r2=1130604&view=diff
==============================================================================
--- 
camel/trunk/components/camel-quickfix/src/test/resources/examples/inprocess.cfg 
(original)
+++ 
camel/trunk/components/camel-quickfix/src/test/resources/examples/inprocess.cfg 
Thu Jun  2 15:26:12 2011
@@ -23,6 +23,7 @@
 UseJmx=Y
 SocketAcceptProtocol=VM_PIPE
 SocketConnectProtocol=VM_PIPE
+HeartBtInt=120
 
 #
 # Initiator for simulating a trader


Reply via email to