Author: hadrian
Date: Thu Jun 2 17:15:30 2011
New Revision: 1130661
URL: http://svn.apache.org/viewvc?rev=1130661&view=rev
Log:
CAMEL-4038. Checkstyle fixes.
Modified:
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessageCorrelator.java
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessagePredicate.java
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEventListener.java
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/RequestReplyExample.java
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonTransformer.java
Modified:
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessageCorrelator.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessageCorrelator.java?rev=1130661&r1=1130660&r2=1130661&view=diff
==============================================================================
---
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessageCorrelator.java
(original)
+++
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessageCorrelator.java
Thu Jun 2 17:15:30 2011
@@ -28,82 +28,74 @@ import quickfix.Message;
import quickfix.SessionID;
public class MessageCorrelator implements QuickfixjEventListener {
- public static final long DEFAULT_CORRELATION_TIMEOUT = 1000L;
- private final CopyOnWriteArrayList<MessageCorrelationRule> rules = new
CopyOnWriteArrayList<MessageCorrelationRule>();
+ public static final long DEFAULT_CORRELATION_TIMEOUT = 1000L;
+ private final CopyOnWriteArrayList<MessageCorrelationRule> rules = new
CopyOnWriteArrayList<MessageCorrelationRule>();
- public Callable<Message> getReply(SessionID sessionID, Exchange
exchange)
- throws InterruptedException, ExchangeTimedOutException {
-
- MessagePredicate messageCriteria = (MessagePredicate) exchange
-
.getProperty(QuickfixjProducer.CORRELATION_CRITERIA_KEY);
- final MessageCorrelationRule correlationRule = new
MessageCorrelationRule(
- exchange, sessionID, messageCriteria);
-
- final long timeout = exchange.getProperty(
- QuickfixjProducer.CORRELATION_TIMEOUT_KEY,
- DEFAULT_CORRELATION_TIMEOUT, Long.class);
-
- rules.add(correlationRule);
-
- return new Callable<Message>() {
- @Override
- public Message call() throws Exception {
- if (!correlationRule.getLatch().await(timeout,
- TimeUnit.MILLISECONDS)) {
- throw new ExchangeTimedOutException(
-
correlationRule.getExchange(), timeout);
- }
- return correlationRule.getReplyMessage();
- }
- };
- }
-
- @Override
- public void onEvent(QuickfixjEventCategory eventCategory,
- SessionID sessionID, Message message) throws Exception {
- if (message != null) {
- for (MessageCorrelationRule rule : rules) {
- if
(rule.getMessageCriteria().evaluate(message)) {
- rule.setReplyMessage(message);
- rules.remove(rule);
- rule.getLatch().countDown();
- }
- }
- }
- }
-
- private class MessageCorrelationRule {
- private final Exchange exchange;
- private final CountDownLatch latch = new CountDownLatch(1);
- private final MessagePredicate messageCriteria;
-
- private Message replyMessage;
-
- public MessageCorrelationRule(Exchange exchange, SessionID
sessionID,
- MessagePredicate messageCriteria) {
- this.exchange = exchange;
- this.messageCriteria = messageCriteria;
- }
-
- public void setReplyMessage(Message message) {
- this.replyMessage = message;
- }
-
- public Message getReplyMessage() {
- return replyMessage;
- }
-
- public CountDownLatch getLatch() {
- return latch;
- }
-
- public Exchange getExchange() {
- return exchange;
- }
-
- public MessagePredicate getMessageCriteria() {
- return messageCriteria;
- }
- }
+ public Callable<Message> getReply(SessionID sessionID, Exchange exchange)
+ throws InterruptedException, ExchangeTimedOutException {
+ MessagePredicate messageCriteria = (MessagePredicate)
exchange.getProperty(QuickfixjProducer.CORRELATION_CRITERIA_KEY);
+ final MessageCorrelationRule correlationRule = new
MessageCorrelationRule(exchange, sessionID, messageCriteria);
+ rules.add(correlationRule);
+
+ final long timeout = exchange.getProperty(
+ QuickfixjProducer.CORRELATION_TIMEOUT_KEY,
+ DEFAULT_CORRELATION_TIMEOUT, Long.class);
+
+ return new Callable<Message>() {
+ @Override
+ public Message call() throws Exception {
+ if (!correlationRule.getLatch().await(timeout,
TimeUnit.MILLISECONDS)) {
+ throw new
ExchangeTimedOutException(correlationRule.getExchange(), timeout);
+ }
+ return correlationRule.getReplyMessage();
+ }
+ };
+ }
+
+ @Override
+ public void onEvent(QuickfixjEventCategory eventCategory, SessionID
sessionID, Message message) throws Exception {
+ if (message != null) {
+ for (MessageCorrelationRule rule : rules) {
+ if (rule.getMessageCriteria().evaluate(message)) {
+ rule.setReplyMessage(message);
+ rules.remove(rule);
+ rule.getLatch().countDown();
+ }
+ }
+ }
+ }
+
+ private class MessageCorrelationRule {
+ private final Exchange exchange;
+ private final CountDownLatch latch = new CountDownLatch(1);
+ private final MessagePredicate messageCriteria;
+
+ private Message replyMessage;
+
+ public MessageCorrelationRule(Exchange exchange, SessionID sessionID,
MessagePredicate messageCriteria) {
+ this.exchange = exchange;
+ this.messageCriteria = messageCriteria;
+ }
+
+ public void setReplyMessage(Message message) {
+ this.replyMessage = message;
+ }
+
+ public Message getReplyMessage() {
+ return replyMessage;
+ }
+
+ public CountDownLatch getLatch() {
+ return latch;
+ }
+
+ public Exchange getExchange() {
+ return exchange;
+ }
+
+ public MessagePredicate getMessageCriteria() {
+ return messageCriteria;
+ }
+ }
}
Modified:
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessagePredicate.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessagePredicate.java?rev=1130661&r1=1130660&r2=1130661&view=diff
==============================================================================
---
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessagePredicate.java
(original)
+++
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/MessagePredicate.java
Thu Jun 2 17:15:30 2011
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.camel.component.quickfixj;
import java.util.ArrayList;
@@ -13,56 +29,56 @@ import quickfix.field.SenderCompID;
import quickfix.field.TargetCompID;
public class MessagePredicate {
- private final List<Field<String>> headerCriteria = new
ArrayList<Field<String>>();
- private final List<Field<String>> bodyCriteria = new
ArrayList<Field<String>>();
-
- public MessagePredicate(SessionID requestingSessionID, String msgType) {
- // Reverse session ID for reply
- // TODO may need to optionally include subID and locationID
- addHeaderFieldIfPresent(SenderCompID.FIELD,
requestingSessionID.getTargetCompID());
- addHeaderFieldIfPresent(TargetCompID.FIELD,
requestingSessionID.getSenderCompID());
- withMessageType(msgType);
- }
-
- private void addHeaderFieldIfPresent(int tag, String value) {
- if (value != null && !"".equals(value)) {
- withHeaderField(tag, value);
- }
- }
-
- public boolean evaluate(Message message) {
- return evaluate(message, bodyCriteria) &&
evaluate(message.getHeader(), headerCriteria);
- }
-
- private boolean evaluate(FieldMap fieldMap, List<Field<String>>
criteria) {
- for (Field<String> c : criteria) {
- String value = null;
- try {
- if (fieldMap.isSetField(c.getField())) {
- value =
fieldMap.getString(c.getField());
- }
- } catch (FieldNotFound e) {
- // ignored, shouldn't happen
- }
- if (!c.getObject().equals(value)) {
- return false;
- }
- }
- return true;
- }
-
- public MessagePredicate withField(int tag, String value) {
- bodyCriteria.add(new Field<String>(tag, value));
- return this;
- }
-
- public MessagePredicate withHeaderField(int tag, String value) {
- headerCriteria.add(new Field<String>(tag, value));
- return this;
- }
-
- private MessagePredicate withMessageType(String msgType) {
- headerCriteria.add(new Field<String>(MsgType.FIELD, msgType));
- return this;
- }
+ private final List<Field<String>> headerCriteria = new
ArrayList<Field<String>>();
+ private final List<Field<String>> bodyCriteria = new
ArrayList<Field<String>>();
+
+ public MessagePredicate(SessionID requestingSessionID, String msgType) {
+ // Reverse session ID for reply
+ // TODO may need to optionally include subID and locationID
+ addHeaderFieldIfPresent(SenderCompID.FIELD,
requestingSessionID.getTargetCompID());
+ addHeaderFieldIfPresent(TargetCompID.FIELD,
requestingSessionID.getSenderCompID());
+ withMessageType(msgType);
+ }
+
+ private void addHeaderFieldIfPresent(int tag, String value) {
+ if (value != null && !"".equals(value)) {
+ withHeaderField(tag, value);
+ }
+ }
+
+ public boolean evaluate(Message message) {
+ return evaluate(message, bodyCriteria) &&
evaluate(message.getHeader(), headerCriteria);
+ }
+
+ private boolean evaluate(FieldMap fieldMap, List<Field<String>> criteria) {
+ for (Field<String> c : criteria) {
+ String value = null;
+ try {
+ if (fieldMap.isSetField(c.getField())) {
+ value = fieldMap.getString(c.getField());
+ }
+ } catch (FieldNotFound e) {
+ // ignored, shouldn't happen
+ }
+ if (!c.getObject().equals(value)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public MessagePredicate withField(int tag, String value) {
+ bodyCriteria.add(new Field<String>(tag, value));
+ return this;
+ }
+
+ public MessagePredicate withHeaderField(int tag, String value) {
+ headerCriteria.add(new Field<String>(tag, value));
+ return this;
+ }
+
+ private MessagePredicate withMessageType(String msgType) {
+ headerCriteria.add(new Field<String>(MsgType.FIELD, msgType));
+ return this;
+ }
}
Modified:
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java?rev=1130661&r1=1130660&r2=1130661&view=diff
==============================================================================
---
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java
(original)
+++
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjConsumer.java
Thu Jun 2 17:15:30 2011
@@ -28,7 +28,7 @@ import quickfix.SessionID;
public class QuickfixjConsumer extends DefaultConsumer {
- public QuickfixjConsumer(Endpoint endpoint, Processor processor) {
+ public QuickfixjConsumer(Endpoint endpoint, Processor processor) {
super(endpoint, processor);
}
@@ -37,7 +37,7 @@ public class QuickfixjConsumer extends D
try {
getProcessor().process(exchange);
if (exchange.getPattern().isOutCapable() && exchange.hasOut())
{
- sendOutMessage(exchange);
+ sendOutMessage(exchange);
}
} catch (Exception e) {
exchange.setException(e);
@@ -45,29 +45,29 @@ public class QuickfixjConsumer extends D
}
}
- private void sendOutMessage(Exchange exchange) {
- try {
- Message camelMessage = exchange.getOut();
- quickfix.Message quickfixjMessage =
camelMessage.getBody(quickfix.Message.class);
-
- if (log.isDebugEnabled()) {
- log.debug("Sending FIX message reply: " +
quickfixjMessage.toString());
- }
-
- SessionID messageSessionID =
MessageUtils.getReverseSessionID(exchange.getIn().getBody(quickfix.Message.class));
-
- Session session = getSession(messageSessionID);
- if (session == null) {
- throw new IllegalStateException("Unknown session: "
+ messageSessionID);
- }
-
- session.send(quickfixjMessage);
- } catch (Exception e) {
- exchange.setException(e);
- }
- }
+ private void sendOutMessage(Exchange exchange) {
+ try {
+ Message camelMessage = exchange.getOut();
+ quickfix.Message quickfixjMessage =
camelMessage.getBody(quickfix.Message.class);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Sending FIX message reply: " +
quickfixjMessage.toString());
+ }
+
+ SessionID messageSessionID =
MessageUtils.getReverseSessionID(exchange.getIn().getBody(quickfix.Message.class));
+
+ Session session = getSession(messageSessionID);
+ if (session == null) {
+ throw new IllegalStateException("Unknown session: " +
messageSessionID);
+ }
+
+ session.send(quickfixjMessage);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+ }
- Session getSession(SessionID messageSessionID) {
- return Session.lookupSession(messageSessionID);
- }
+ Session getSession(SessionID messageSessionID) {
+ return Session.lookupSession(messageSessionID);
+ }
}
Modified:
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java?rev=1130661&r1=1130660&r2=1130661&view=diff
==============================================================================
---
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
(original)
+++
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
Thu Jun 2 17:15:30 2011
@@ -34,115 +34,106 @@ import org.slf4j.LoggerFactory;
import quickfix.Message;
import quickfix.SessionID;
-public class QuickfixjEndpoint extends DefaultEndpoint implements
- QuickfixjEventListener, MultipleConsumersSupport {
- public static final String EVENT_CATEGORY_KEY = "EventCategory";
- public static final String SESSION_ID_KEY = "SessionID";
- public static final String MESSAGE_TYPE_KEY = "MessageType";
- public static final String DATA_DICTIONARY_KEY = "DataDictionary";
-
- private static final Logger LOG = LoggerFactory
- .getLogger(QuickfixjEndpoint.class);
-
- private SessionID sessionID;
- private final List<QuickfixjConsumer> consumers = new
CopyOnWriteArrayList<QuickfixjConsumer>();
- private final QuickfixjEngine engine;
-
- public QuickfixjEndpoint(QuickfixjEngine engine, String uri,
CamelContext context) {
- super(uri, context);
- this.engine = engine;
- }
-
- public SessionID getSessionID() {
- return sessionID;
- }
-
- public void setSessionID(SessionID sessionID) {
- this.sessionID = sessionID;
- }
-
- public Consumer createConsumer(Processor processor) throws Exception {
- LOG.info("Creating QuickFIX/J consumer: "
- + (sessionID != null ? sessionID : "No Session")
- + ", ExchangePattern=" + getExchangePattern());
- QuickfixjConsumer consumer = new QuickfixjConsumer(this,
processor);
- consumers.add(consumer);
- return consumer;
- }
-
- public Producer createProducer() throws Exception {
- LOG.info("Creating QuickFIX/J producer: "
- + (sessionID != null ? sessionID : "No
Session"));
- if (isWildcarded()) {
- throw new ResolveEndpointFailedException(
- "Cannot create consumer on wildcarded
session identifier: "
- + sessionID);
- }
- return new QuickfixjProducer(this);
- }
-
- public boolean isSingleton() {
- return true;
- }
-
- public void onEvent(QuickfixjEventCategory eventCategory,
- SessionID sessionID, Message message) throws Exception {
- if (this.sessionID == null || isMatching(sessionID)) {
- for (QuickfixjConsumer consumer : consumers) {
- Exchange exchange =
QuickfixjConverters.toExchange(this,
- sessionID, message,
eventCategory);
- consumer.onExchange(exchange);
- if (exchange.getException() != null) {
- throw exchange.getException();
- }
- }
- }
- }
-
- private boolean isMatching(SessionID sessionID) {
- return this.sessionID.equals(sessionID)
- || (isMatching(this.sessionID.getBeginString(),
- sessionID.getBeginString())
- &&
isMatching(this.sessionID.getSenderCompID(),
-
sessionID.getSenderCompID())
- &&
isMatching(this.sessionID.getSenderSubID(),
-
sessionID.getSenderSubID())
- &&
isMatching(this.sessionID.getSenderLocationID(),
-
sessionID.getSenderLocationID())
- &&
isMatching(this.sessionID.getTargetCompID(),
-
sessionID.getTargetCompID())
- &&
isMatching(this.sessionID.getTargetSubID(),
-
sessionID.getTargetSubID()) && isMatching(
-
this.sessionID.getTargetLocationID(),
-
sessionID.getTargetLocationID()));
- }
-
- private boolean isMatching(String s1, String s2) {
- return s1.equals("") || s1.equals("*") || s1.equals(s2);
- }
-
- private boolean isWildcarded() {
- return sessionID != null
- && (sessionID.getBeginString().equals("*")
- ||
sessionID.getSenderCompID().equals("*")
- ||
sessionID.getSenderSubID().equals("*")
- ||
sessionID.getSenderLocationID().equals("*")
- ||
sessionID.getTargetCompID().equals("*")
- ||
sessionID.getTargetSubID().equals("*") || sessionID
-
.getTargetLocationID().equals("*"));
- }
-
- public boolean isMultipleConsumersSupported() {
- return true;
- }
-
- public QuickfixjEngine getEngine() {
- return engine;
- }
-
- @Override
- protected void doStop() throws Exception {
- // clear list of consumers
- consumers.clear();
- }
+public class QuickfixjEndpoint extends DefaultEndpoint implements
QuickfixjEventListener, MultipleConsumersSupport {
+ public static final String EVENT_CATEGORY_KEY = "EventCategory";
+ public static final String SESSION_ID_KEY = "SessionID";
+ public static final String MESSAGE_TYPE_KEY = "MessageType";
+ public static final String DATA_DICTIONARY_KEY = "DataDictionary";
+
+ private static final Logger LOG =
LoggerFactory.getLogger(QuickfixjEndpoint.class);
+
+ private SessionID sessionID;
+ private final List<QuickfixjConsumer> consumers = new
CopyOnWriteArrayList<QuickfixjConsumer>();
+ private final QuickfixjEngine engine;
+
+ public QuickfixjEndpoint(QuickfixjEngine engine, String uri, CamelContext
context) {
+ super(uri, context);
+ this.engine = engine;
+ }
+
+ public SessionID getSessionID() {
+ return sessionID;
+ }
+
+ public void setSessionID(SessionID sessionID) {
+ this.sessionID = sessionID;
+ }
+
+ public Consumer createConsumer(Processor processor) throws Exception {
+ LOG.info("Creating QuickFIX/J consumer: "
+ + (sessionID != null ? sessionID : "No Session")
+ + ", ExchangePattern=" + getExchangePattern());
+ QuickfixjConsumer consumer = new QuickfixjConsumer(this, processor);
+ consumers.add(consumer);
+ return consumer;
+ }
+
+ public Producer createProducer() throws Exception {
+ LOG.info("Creating QuickFIX/J producer: "
+ + (sessionID != null ? sessionID : "No Session"));
+ if (isWildcarded()) {
+ throw new ResolveEndpointFailedException("Cannot create consumer
on wildcarded session identifier: " + sessionID);
+ }
+ return new QuickfixjProducer(this);
+ }
+
+ public boolean isSingleton() {
+ return true;
+ }
+
+ public void onEvent(QuickfixjEventCategory eventCategory, SessionID
sessionID, Message message) throws Exception {
+ if (this.sessionID == null || isMatching(sessionID)) {
+ for (QuickfixjConsumer consumer : consumers) {
+ Exchange exchange = QuickfixjConverters.toExchange(this,
sessionID, message, eventCategory);
+ consumer.onExchange(exchange);
+ if (exchange.getException() != null) {
+ throw exchange.getException();
+ }
+ }
+ }
+ }
+
+ private boolean isMatching(SessionID sessionID) {
+ if (this.sessionID.equals(sessionID)) {
+ return true;
+ }
+ return isMatching(this.sessionID.getBeginString(),
sessionID.getBeginString())
+ && isMatching(this.sessionID.getSenderCompID(),
sessionID.getSenderCompID())
+ && isMatching(this.sessionID.getSenderSubID(),
sessionID.getSenderSubID())
+ && isMatching(this.sessionID.getSenderLocationID(),
sessionID.getSenderLocationID())
+ && isMatching(this.sessionID.getTargetCompID(),
sessionID.getTargetCompID())
+ && isMatching(this.sessionID.getTargetSubID(),
sessionID.getTargetSubID())
+ && isMatching(this.sessionID.getTargetLocationID(),
sessionID.getTargetLocationID());
+ }
+
+ private boolean isMatching(String s1, String s2) {
+ return s1.equals("") || s1.equals("*") || s1.equals(s2);
+ }
+
+ private boolean isWildcarded() {
+ if (sessionID == null) {
+ return false;
+ }
+ return sessionID.getBeginString().equals("*")
+ || sessionID.getSenderCompID().equals("*")
+ || sessionID.getSenderSubID().equals("*")
+ || sessionID.getSenderLocationID().equals("*")
+ || sessionID.getTargetCompID().equals("*")
+ || sessionID.getTargetSubID().equals("*")
+ || sessionID.getTargetLocationID().equals("*");
+ }
+
+ public boolean isMultipleConsumersSupported() {
+ return true;
+ }
+
+ public QuickfixjEngine getEngine() {
+ return engine;
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ // clear list of consumers
+ consumers.clear();
+ }
}
Modified:
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java?rev=1130661&r1=1130660&r2=1130661&view=diff
==============================================================================
---
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
(original)
+++
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java
Thu Jun 2 17:15:30 2011
@@ -120,17 +120,14 @@ public class QuickfixjEngine {
MessageStoreFactory messageStoreFactoryOverride, LogFactory
sessionLogFactoryOverride,
MessageFactory messageFactoryOverride) throws ConfigError,
FieldConvertError, IOException, JMException {
- addEventListener(messageCorrelator);
-
+ addEventListener(messageCorrelator);
+
this.uri = uri;
this.forcedShutdown = forcedShutdown;
messageFactory = messageFactoryOverride != null ?
messageFactoryOverride : new DefaultMessageFactory();
-
sessionLogFactory = sessionLogFactoryOverride != null ?
sessionLogFactoryOverride : inferLogFactory(settings);
-
- messageStoreFactory = messageStoreFactoryOverride != null ?
messageStoreFactoryOverride
- : inferMessageStoreFactory(settings);
+ messageStoreFactory = messageStoreFactoryOverride != null ?
messageStoreFactoryOverride : inferMessageStoreFactory(settings);
// Set default session schedule if not specified in configuration
if (!settings.isSetting(Session.SETTING_START_TIME)) {
@@ -165,15 +162,15 @@ public class QuickfixjEngine {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
if (isConnectorRole(settings,
SessionFactory.ACCEPTOR_CONNECTION_TYPE)) {
- acceptor = createAcceptor(new Dispatcher(), settings,
messageStoreFactory, sessionLogFactory,
- messageFactory, threadModel);
+ acceptor = createAcceptor(new Dispatcher(), settings,
messageStoreFactory,
+ sessionLogFactory, messageFactory, threadModel);
} else {
acceptor = null;
}
if (isConnectorRole(settings,
SessionFactory.INITIATOR_CONNECTION_TYPE)) {
- initiator = createInitiator(new Dispatcher(), settings,
messageStoreFactory, sessionLogFactory,
- messageFactory, threadModel);
+ initiator = createInitiator(new Dispatcher(), settings,
messageStoreFactory,
+ sessionLogFactory, messageFactory, threadModel);
} else {
initiator = null;
}
@@ -467,8 +464,8 @@ public class QuickfixjEngine {
}
public MessageCorrelator getMessageCorrelator() {
- return messageCorrelator;
- }
+ return messageCorrelator;
+ }
// For Testing
Initiator getInitiator() {
Modified:
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEventListener.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEventListener.java?rev=1130661&r1=1130660&r2=1130661&view=diff
==============================================================================
---
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEventListener.java
(original)
+++
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEventListener.java
Thu Jun 2 17:15:30 2011
@@ -24,7 +24,5 @@ import quickfix.SessionID;
* @see QuickfixjEngine
*/
public interface QuickfixjEventListener {
-
void onEvent(QuickfixjEventCategory eventCategory, SessionID sessionID,
Message message) throws Exception;
-
}
Modified:
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java?rev=1130661&r1=1130660&r2=1130661&view=diff
==============================================================================
---
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
(original)
+++
camel/trunk/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java
Thu Jun 2 17:15:30 2011
@@ -30,8 +30,8 @@ import quickfix.SessionID;
public class QuickfixjProducer extends DefaultProducer {
public static final String CORRELATION_TIMEOUT_KEY = "CorrelationTimeout";
public static final String CORRELATION_CRITERIA_KEY =
"CorrelationCriteria";
-
- private final SessionID sessionID;
+
+ private final SessionID sessionID;
public QuickfixjProducer(Endpoint endpoint) {
super(endpoint);
@@ -42,47 +42,43 @@ public class QuickfixjProducer extends D
sendMessage(exchange, exchange.getIn());
}
- void sendMessage(Exchange exchange, org.apache.camel.Message
camelMessage)
- throws InterruptedException {
- try {
- Message message = camelMessage.getBody(Message.class);
- if (log.isDebugEnabled()) {
- log.debug("Sending FIX message: " +
message.toString());
- }
-
- SessionID messageSessionID = sessionID;
- if (messageSessionID == null) {
- messageSessionID =
MessageUtils.getSessionID(message);
- }
-
- Session session = getSession(messageSessionID);
- if (session == null) {
- throw new IllegalStateException("Unknown
session: "
- + messageSessionID);
- }
-
- Callable<Message> callable = null;
-
- if (exchange.getPattern().isOutCapable()) {
- QuickfixjEndpoint endpoint =
(QuickfixjEndpoint) getEndpoint();
- MessageCorrelator messageCorrelator =
endpoint.getEngine()
- .getMessageCorrelator();
- callable = messageCorrelator.getReply(
- endpoint.getSessionID(),
exchange);
- }
-
- session.send(message);
-
- if (callable != null) {
- Message reply = callable.call();
- exchange.getOut().setBody(reply);
- }
- } catch (Exception e) {
- exchange.setException(e);
- }
- }
-
- Session getSession(SessionID messageSessionID) {
- return Session.lookupSession(messageSessionID);
- }
+ void sendMessage(Exchange exchange, org.apache.camel.Message camelMessage)
throws InterruptedException {
+ try {
+ Message message = camelMessage.getBody(Message.class);
+ if (log.isDebugEnabled()) {
+ log.debug("Sending FIX message: " + message.toString());
+ }
+
+ SessionID messageSessionID = sessionID;
+ if (messageSessionID == null) {
+ messageSessionID = MessageUtils.getSessionID(message);
+ }
+
+ Session session = getSession(messageSessionID);
+ if (session == null) {
+ throw new IllegalStateException("Unknown session: " +
messageSessionID);
+ }
+
+ Callable<Message> callable = null;
+
+ if (exchange.getPattern().isOutCapable()) {
+ QuickfixjEndpoint endpoint = (QuickfixjEndpoint) getEndpoint();
+ MessageCorrelator messageCorrelator =
endpoint.getEngine().getMessageCorrelator();
+ callable = messageCorrelator.getReply(endpoint.getSessionID(),
exchange);
+ }
+
+ session.send(message);
+
+ if (callable != null) {
+ Message reply = callable.call();
+ exchange.getOut().setBody(reply);
+ }
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+ }
+
+ Session getSession(SessionID messageSessionID) {
+ return Session.lookupSession(messageSessionID);
+ }
}
Modified:
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java?rev=1130661&r1=1130660&r2=1130661&view=diff
==============================================================================
---
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java
(original)
+++
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjConsumerTest.java
Thu Jun 2 17:15:30 2011
@@ -41,8 +41,8 @@ public class QuickfixjConsumerTest {
private Exchange mockExchange;
private Processor mockProcessor;
private Endpoint mockEndpoint;
- private SessionID sessionID;
- private Message inboundFixMessage;
+ private SessionID sessionID;
+ private Message inboundFixMessage;
@Before
public void setUp() {
@@ -56,11 +56,9 @@ public class QuickfixjConsumerTest {
inboundFixMessage.getHeader().setString(SenderCompID.FIELD, "SENDER");
inboundFixMessage.getHeader().setString(TargetCompID.FIELD, "TARGET");
sessionID = MessageUtils.getSessionID(inboundFixMessage);
-
-
Mockito.when(mockCamelMessage.getBody(quickfix.Message.class)).thenReturn(inboundFixMessage);
-
- mockProcessor = Mockito.mock(Processor.class);
+
Mockito.when(mockCamelMessage.getBody(quickfix.Message.class)).thenReturn(inboundFixMessage);
+ mockProcessor = Mockito.mock(Processor.class);
mockEndpoint = Mockito.mock(Endpoint.class);
Mockito.when(mockEndpoint.createExchange(ExchangePattern.InOnly)).thenReturn(mockExchange);
}
@@ -123,19 +121,17 @@ public class QuickfixjConsumerTest {
Mockito.when(mockExchange.hasOut()).thenReturn(true);
Mockito.when(mockExchange.getOut()).thenReturn(mockCamelOutMessage);
Message outboundFixMessage = new Message();
-
Mockito.when(mockCamelOutMessage.getBody(Message.class)).thenReturn(outboundFixMessage);
+
Mockito.when(mockCamelOutMessage.getBody(Message.class)).thenReturn(outboundFixMessage);
QuickfixjConsumer consumer = Mockito.spy(new
QuickfixjConsumer(mockEndpoint, mockProcessor));
Session mockSession =
Mockito.spy(TestSupport.createSession(sessionID));
-
Mockito.doReturn(mockSession).when(consumer).getSession(MessageUtils.getReverseSessionID(inboundFixMessage));
-
Mockito.doReturn(true).when(mockSession).send(Mockito.isA(Message.class));
+
Mockito.doReturn(mockSession).when(consumer).getSession(MessageUtils.getReverseSessionID(inboundFixMessage));
+
Mockito.doReturn(true).when(mockSession).send(Mockito.isA(Message.class));
consumer.start();
-
Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOut);
consumer.onExchange(mockExchange);
-
Mockito.verify(mockExchange,
Mockito.never()).setException(Mockito.isA(Exception.class));
Mockito.verify(mockSession).send(outboundFixMessage);
}
Modified:
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java?rev=1130661&r1=1130660&r2=1130661&view=diff
==============================================================================
---
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
(original)
+++
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjProducerTest.java
Thu Jun 2 17:15:30 2011
@@ -45,16 +45,16 @@ import quickfix.field.TargetCompID;
import quickfix.fix42.Email;
public class QuickfixjProducerTest {
- private Exchange mockExchange;
- private QuickfixjEndpoint mockEndpoint;
- private org.apache.camel.Message mockCamelMessage;
- private QuickfixjProducer producer;
- private SessionID sessionID;
- private Message inboundFixMessage;
- private QuickfixjEngine quickfixjEngine;
+ private Exchange mockExchange;
+ private QuickfixjEndpoint mockEndpoint;
+ private org.apache.camel.Message mockCamelMessage;
+ private QuickfixjProducer producer;
+ private SessionID sessionID;
+ private Message inboundFixMessage;
+ private QuickfixjEngine quickfixjEngine;
- @Before
- public void setUp() throws ConfigError, FieldConvertError, IOException,
JMException {
+ @Before
+ public void setUp() throws ConfigError, FieldConvertError, IOException,
JMException {
mockExchange = Mockito.mock(Exchange.class);
mockEndpoint = Mockito.mock(QuickfixjEndpoint.class);
mockCamelMessage = Mockito.mock(org.apache.camel.Message.class);
@@ -75,28 +75,28 @@ public class QuickfixjProducerTest {
Mockito.when(mockEndpoint.getSessionID()).thenReturn(sessionID);
producer = Mockito.spy(new QuickfixjProducer(mockEndpoint));
- }
-
- @SuppressWarnings("serial")
- public class TestException extends RuntimeException {
-
- }
-
+ }
+
+ @SuppressWarnings("serial")
+ public class TestException extends RuntimeException {
+
+ }
+
@Test
public void setExceptionOnExchange() throws Exception {
Session mockSession =
Mockito.spy(TestSupport.createSession(sessionID));
-
Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage));
- Mockito.doThrow(new
TestException()).when(mockSession).send(Mockito.isA(Message.class));
+
Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage));
+ Mockito.doThrow(new
TestException()).when(mockSession).send(Mockito.isA(Message.class));
- producer.process(mockExchange);
+ producer.process(mockExchange);
Mockito.verify(mockExchange).setException(Matchers.isA(TestException.class));
}
@Test
public void processInOnlyExchange() throws Exception {
Session mockSession =
Mockito.spy(TestSupport.createSession(sessionID));
-
Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage));
-
Mockito.doReturn(true).when(mockSession).send(Mockito.isA(Message.class));
+
Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage));
+
Mockito.doReturn(true).when(mockSession).send(Mockito.isA(Message.class));
producer.process(mockExchange);
@@ -107,37 +107,38 @@ public class QuickfixjProducerTest {
@Test
public void processInOutExchange() throws Exception {
Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOut);
-
Mockito.when(mockExchange.getProperty(QuickfixjProducer.CORRELATION_CRITERIA_KEY)).
- thenReturn(new MessagePredicate(sessionID, MsgType.EMAIL));
+
Mockito.when(mockExchange.getProperty(QuickfixjProducer.CORRELATION_CRITERIA_KEY)).thenReturn(
+ new MessagePredicate(sessionID, MsgType.EMAIL));
Mockito.when(mockExchange.getProperty(
- QuickfixjProducer.CORRELATION_TIMEOUT_KEY,
- 1000L, Long.class)).thenReturn(5000L);
-
+ QuickfixjProducer.CORRELATION_TIMEOUT_KEY,
+ 1000L, Long.class)).thenReturn(5000L);
+
org.apache.camel.Message mockOutboundCamelMessage =
Mockito.mock(org.apache.camel.Message.class);
-
Mockito.when(mockExchange.getOut()).thenReturn(mockOutboundCamelMessage);
+
Mockito.when(mockExchange.getOut()).thenReturn(mockOutboundCamelMessage);
final Message outboundFixMessage = new Email();
outboundFixMessage.getHeader().setString(SenderCompID.FIELD, "TARGET");
outboundFixMessage.getHeader().setString(TargetCompID.FIELD, "SENDER");
Session mockSession =
Mockito.spy(TestSupport.createSession(sessionID));
-
Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage));
- Mockito.doAnswer(new Answer<Boolean>() {
- @Override
- public Boolean answer(InvocationOnMock invocation)
throws Throwable {
- new Timer().schedule(new TimerTask() {
- @Override
- public void run() {
- try {
-
quickfixjEngine.getMessageCorrelator().onEvent(QuickfixjEventCategory.AppMessageReceived,
sessionID, outboundFixMessage);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }, 10);
- return true;
- }
- }).when(mockSession).send(Mockito.isA(Message.class));
+
Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage));
+ Mockito.doAnswer(new Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) throws
Throwable {
+ new Timer().schedule(new TimerTask() {
+ @Override
+ public void run() {
+ try {
+
quickfixjEngine.getMessageCorrelator().onEvent(QuickfixjEventCategory.AppMessageReceived,
sessionID, outboundFixMessage);
+ } catch (Exception e) {
+ // TODO: probably not the best thing... use a LOG
+ e.printStackTrace();
+ }
+ }
+ }, 10);
+ return true;
+ }
+ }).when(mockSession).send(Mockito.isA(Message.class));
producer.process(mockExchange);
Modified:
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java?rev=1130661&r1=1130660&r2=1130661&view=diff
==============================================================================
---
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java
(original)
+++
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java
Thu Jun 2 17:15:30 2011
@@ -73,49 +73,47 @@ public final class TestSupport {
}
public static Session createSession(SessionID sessionID) throws
ConfigError, IOException {
- MessageStoreFactory mockMessageStoreFactory =
Mockito.mock(MessageStoreFactory.class);
- MessageStore mockMessageStore = Mockito.mock(MessageStore.class);
- Mockito.when(mockMessageStore.getCreationTime()).thenReturn(new Date());
-
-
Mockito.when(mockMessageStoreFactory.create(sessionID)).thenReturn(mockMessageStore);
-
- DefaultSessionFactory factory = new DefaultSessionFactory(
- Mockito.mock(Application.class),
- mockMessageStoreFactory,
- Mockito.mock(LogFactory.class));
-
- SessionSettings settings = new SessionSettings();
- settings.setLong(Session.SETTING_HEARTBTINT, 10);
- settings.setString(Session.SETTING_START_TIME, "00:00:00");
- settings.setString(Session.SETTING_END_TIME, "00:00:00");
- settings.setString(SessionFactory.SETTING_CONNECTION_TYPE,
SessionFactory.ACCEPTOR_CONNECTION_TYPE);
- settings.setBool(Session.SETTING_USE_DATA_DICTIONARY, false);
-
- return factory.create(sessionID, settings);
-
+ MessageStoreFactory mockMessageStoreFactory =
Mockito.mock(MessageStoreFactory.class);
+ MessageStore mockMessageStore = Mockito.mock(MessageStore.class);
+ Mockito.when(mockMessageStore.getCreationTime()).thenReturn(new
Date());
+
+
Mockito.when(mockMessageStoreFactory.create(sessionID)).thenReturn(mockMessageStore);
+
+ DefaultSessionFactory factory = new DefaultSessionFactory(
+ Mockito.mock(Application.class),
+ mockMessageStoreFactory,
+ Mockito.mock(LogFactory.class));
+
+ SessionSettings settings = new SessionSettings();
+ settings.setLong(Session.SETTING_HEARTBTINT, 10);
+ settings.setString(Session.SETTING_START_TIME, "00:00:00");
+ settings.setString(Session.SETTING_END_TIME, "00:00:00");
+ settings.setString(SessionFactory.SETTING_CONNECTION_TYPE,
SessionFactory.ACCEPTOR_CONNECTION_TYPE);
+ settings.setBool(Session.SETTING_USE_DATA_DICTIONARY, false);
+
+ return factory.create(sessionID, settings);
}
- public static QuickfixjEngine createEngine() throws ConfigError,
FieldConvertError, IOException, JMException {
- SessionID sessionID = new SessionID("FIX.4.4:SENDER->TARGET");
+ public static QuickfixjEngine createEngine() throws ConfigError,
FieldConvertError, IOException, JMException {
+ SessionID sessionID = new SessionID("FIX.4.4:SENDER->TARGET");
- MessageStoreFactory mockMessageStoreFactory =
Mockito.mock(MessageStoreFactory.class);
- MessageStore mockMessageStore = Mockito.mock(MessageStore.class);
- Mockito.when(mockMessageStore.getCreationTime()).thenReturn(new Date());
-
Mockito.when(mockMessageStoreFactory.create(sessionID)).thenReturn(mockMessageStore);
-
- SessionSettings settings = new SessionSettings();
-
- settings.setLong(sessionID, Session.SETTING_HEARTBTINT, 10);
- settings.setString(sessionID, Session.SETTING_START_TIME, "00:00:00");
- settings.setString(sessionID, Session.SETTING_END_TIME, "00:00:00");
- settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE,
SessionFactory.ACCEPTOR_CONNECTION_TYPE);
- settings.setLong(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PORT, 8000);
- settings.setBool(sessionID, Session.SETTING_USE_DATA_DICTIONARY, false);
-
- return new QuickfixjEngine("", settings, false,
- mockMessageStoreFactory,
- Mockito.mock(LogFactory.class),
- Mockito.mock(MessageFactory.class));
-
+ MessageStoreFactory mockMessageStoreFactory =
Mockito.mock(MessageStoreFactory.class);
+ MessageStore mockMessageStore = Mockito.mock(MessageStore.class);
+ Mockito.when(mockMessageStore.getCreationTime()).thenReturn(new
Date());
+
Mockito.when(mockMessageStoreFactory.create(sessionID)).thenReturn(mockMessageStore);
+
+ SessionSettings settings = new SessionSettings();
+
+ settings.setLong(sessionID, Session.SETTING_HEARTBTINT, 10);
+ settings.setString(sessionID, Session.SETTING_START_TIME, "00:00:00");
+ settings.setString(sessionID, Session.SETTING_END_TIME, "00:00:00");
+ settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE,
SessionFactory.ACCEPTOR_CONNECTION_TYPE);
+ settings.setLong(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PORT, 8000);
+ settings.setBool(sessionID, Session.SETTING_USE_DATA_DICTIONARY,
false);
+
+ return new QuickfixjEngine("", settings, false,
+ mockMessageStoreFactory,
+ Mockito.mock(LogFactory.class),
+ Mockito.mock(MessageFactory.class));
}
}
Modified:
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/RequestReplyExample.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/RequestReplyExample.java?rev=1130661&r1=1130660&r2=1130661&view=diff
==============================================================================
---
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/RequestReplyExample.java
(original)
+++
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/RequestReplyExample.java
Thu Jun 2 17:15:30 2011
@@ -36,7 +36,6 @@ import org.apache.camel.component.quickf
import
org.apache.camel.component.quickfixj.examples.transform.QuickfixjMessageJsonTransformer;
import
org.apache.camel.component.quickfixj.examples.util.CountDownLatchDecrementer;
import org.apache.camel.impl.DefaultCamelContext;
-import org.eclipse.jetty.util.log.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,21 +71,21 @@ public class RequestReplyExample {
RouteBuilder routes = new RouteBuilder() {
@Override
public void configure() throws Exception {
- // Synchronize the logon so we don't start sending status
requests too early
+ // Synchronize the logon so we don't start sending status
requests too early
from("quickfix:examples/inprocess.cfg?sessionID=FIX.4.2:TRADER->MARKET").
filter(header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.SessionLogon)).
bean(new CountDownLatchDecrementer("logon", logonLatch));
// Incoming status requests are converted to InOut exchange
pattern and passed to the
// order status service. The response is sent back to the
session making the request.
-
from("quickfix:examples/inprocess.cfg?sessionID=FIX.4.2:MARKET->TRADER&exchangePattern=InOut").
-
filter(header(QuickfixjEndpoint.MESSAGE_TYPE_KEY).isEqualTo(MsgType.ORDER_STATUS_REQUEST)).
- bean(new MarketOrderStatusService());
+
from("quickfix:examples/inprocess.cfg?sessionID=FIX.4.2:MARKET->TRADER&exchangePattern=InOut")
+
.filter(header(QuickfixjEndpoint.MESSAGE_TYPE_KEY).isEqualTo(MsgType.ORDER_STATUS_REQUEST))
+ .bean(new MarketOrderStatusService());
- from ("jetty:" + orderStatusServiceUrl).
- bean(new OrderStatusRequestTransformer()).
- routingSlip(bean(FixSessionRouter.class, "route")).
- bean(new QuickfixjMessageJsonTransformer());
+ from("jetty:" + orderStatusServiceUrl)
+ .bean(new OrderStatusRequestTransformer())
+ .routingSlip(bean(FixSessionRouter.class, "route"))
+ .bean(new QuickfixjMessageJsonTransformer());
}
};
@@ -103,21 +102,19 @@ public class RequestReplyExample {
// Verify that the response is a JSON response.
URL orderStatusUrl = new URL(orderStatusServiceUrl +
"?sessionID=FIX.4.2:TRADER->MARKET&orderID=abc");
- HttpURLConnection connection = (HttpURLConnection)
orderStatusUrl.openConnection();
+ HttpURLConnection connection = (HttpURLConnection)
orderStatusUrl.openConnection();
BufferedReader orderStatusReply = new BufferedReader(new
InputStreamReader(connection.getInputStream()));
String line = orderStatusReply.readLine();
if (!line.equals("\"message\": {")) {
- throw new Exception("Don't appear to be a JSON response");
- }
- else {
- StringBuilder sb = new StringBuilder();
- while (line != null) {
- sb.append(line);
- sb.append('\n');
- line = orderStatusReply.readLine();
- }
- Log.info("Web request response:\n" + sb);
-
+ throw new Exception("Don't appear to be a JSON response");
+ } else {
+ StringBuilder sb = new StringBuilder();
+ while (line != null) {
+ sb.append(line);
+ sb.append('\n');
+ line = orderStatusReply.readLine();
+ }
+ LOG.info("Web request response:\n" + sb);
}
orderStatusReply.close();
@@ -128,45 +125,44 @@ public class RequestReplyExample {
}
public static class OrderStatusRequestTransformer {
- public void transform(Exchange exchange) throws FieldNotFound {
- String sessionID = (String)
exchange.getIn().getHeader("sessionID");
- String orderID = (String) exchange.getIn().getHeader("orderID");
-
+ public void transform(Exchange exchange) throws FieldNotFound {
+ String sessionID = (String)
exchange.getIn().getHeader("sessionID");
+ String orderID = (String) exchange.getIn().getHeader("orderID");
+
OrderStatusRequest request = new OrderStatusRequest(new
ClOrdID("XYZ"), new Symbol("GOOG"), new Side(Side.BUY));
request.set(new OrderID(orderID));
// Look for a reply execution report back to the requester session
// and having the requested OrderID. This is a loose correlation
but the best
// we can do with FIX 4.2. Newer versions of FIX have an optional
explicit correlation field.
- exchange.setProperty(QuickfixjProducer.CORRELATION_CRITERIA_KEY,
- new MessagePredicate(new SessionID(sessionID),
MsgType.EXECUTION_REPORT)
- .withField(OrderID.FIELD,
request.getString(OrderID.FIELD)));
+ exchange.setProperty(QuickfixjProducer.CORRELATION_CRITERIA_KEY,
new MessagePredicate(
+ new SessionID(sessionID),
MsgType.EXECUTION_REPORT).withField(OrderID.FIELD,
request.getString(OrderID.FIELD)));
exchange.getIn().setBody(request);
- }
+ }
}
public static class MarketOrderStatusService {
private static final Logger LOG =
LoggerFactory.getLogger(QuickfixjComponent.class);
public ExecutionReport getOrderStatus(OrderStatusRequest request)
throws FieldNotFound {
- LOG.info("Received order status request for orderId=" +
request.getOrderID().getValue());
- return new ExecutionReport(
- request.getOrderID(), new
ExecID(UUID.randomUUID().toString()),
- new
ExecTransType(ExecTransType.STATUS),
- new ExecType(ExecType.REJECTED),
- new
OrdStatus(OrdStatus.REJECTED),
- new Symbol("GOOG"),
- new Side(Side.BUY),
- new LeavesQty(100),
- new CumQty(0),
- new AvgPx(0));
- }
+ LOG.info("Received order status request for orderId=" +
request.getOrderID().getValue());
+ return new ExecutionReport(request.getOrderID(),
+ new ExecID(UUID.randomUUID().toString()),
+ new ExecTransType(ExecTransType.STATUS),
+ new ExecType(ExecType.REJECTED),
+ new OrdStatus(OrdStatus.REJECTED),
+ new Symbol("GOOG"),
+ new Side(Side.BUY),
+ new LeavesQty(100),
+ new CumQty(0),
+ new AvgPx(0));
+ }
}
public static class FixSessionRouter {
- public String route(@Header("sessionID") String sessionID, @Body Object
body) {
- return
String.format("quickfix:examples/inprocess.cfg?sessionID=%s", sessionID);
- }
+ public String route(@Header("sessionID") String sessionID, @Body
Object body) {
+ return
String.format("quickfix:examples/inprocess.cfg?sessionID=%s", sessionID);
+ }
}
}
Modified:
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonTransformer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonTransformer.java?rev=1130661&r1=1130660&r2=1130661&view=diff
==============================================================================
---
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonTransformer.java
(original)
+++
camel/trunk/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonTransformer.java
Thu Jun 2 17:15:30 2011
@@ -32,8 +32,8 @@ import quickfix.SessionID;
public class QuickfixjMessageJsonTransformer {
- public String transform(Message message) throws FieldNotFound,
ConfigError {
- SessionID sessionID = MessageUtils.getSessionID(message);
+ public String transform(Message message) throws FieldNotFound, ConfigError
{
+ SessionID sessionID = MessageUtils.getSessionID(message);
Session session = Session.lookupSession(sessionID);
DataDictionary dataDictionary = session.getDataDictionary();
@@ -41,9 +41,9 @@ public class QuickfixjMessageJsonTransfo
throw new IllegalStateException("No Data Dictionary. Exchange must
reference an existing session");
}
- return transform(message, dataDictionary);
- }
-
+ return transform(message, dataDictionary);
+ }
+
public String transform(Message message, DataDictionary dataDictionary) {
return transform(message, "", dataDictionary);
}