ignite-534 - IgniteJmsStreamer implemented. Closes #14.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0d701127 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0d701127 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0d701127 Branch: refs/heads/ignite-426 Commit: 0d701127c8a95a30f4c484247e84ab1f10f95e3a Parents: d0536dc Author: Raul Kripalani <ra...@apache.org> Authored: Fri Aug 14 16:11:19 2015 -0700 Committer: Valentin Kulichenko <valentin.kuliche...@gmail.com> Committed: Fri Aug 14 16:11:19 2015 -0700 ---------------------------------------------------------------------- .../ignite/internal/util/GridArgumentCheck.java | 17 + modules/jms11/pom.xml | 98 ++++ .../apache/ignite/stream/jms11/JmsStreamer.java | 524 ++++++++++++++++++ .../ignite/stream/jms11/MessageTransformer.java | 43 ++ .../stream/jms11/IgniteJmsStreamerTest.java | 551 +++++++++++++++++++ .../jms11/IgniteJmsStreamerTestSuite.java | 34 ++ .../ignite/stream/jms11/TestTransformers.java | 123 +++++ pom.xml | 1 + 8 files changed, 1391 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0d701127/modules/core/src/main/java/org/apache/ignite/internal/util/GridArgumentCheck.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridArgumentCheck.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridArgumentCheck.java index b94bae6..8289f90 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridArgumentCheck.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridArgumentCheck.java @@ -35,6 +35,9 @@ public class GridArgumentCheck { /** Not empty argument error message suffix. */ private static final String NOT_EMPTY_SUFFIX = " must not be empty."; + /** Not null or empty error message suffix. */ + private static final String NOT_NULL_OR_EMPTY_SUFFIX = " must not be null or empty."; + /** * Checks if given argument value is not {@code null}. Otherwise - throws {@link NullPointerException}. * @@ -145,4 +148,18 @@ public class GridArgumentCheck { if (arr.length == 0) throw new IllegalArgumentException(INVALID_ARG_MSG_PREFIX + name + NOT_EMPTY_SUFFIX); } + + /** + * Checks that a String is not null or empty. + * + * @param value Value to check. + * @param name Argument name. + */ + public static void notNullOrEmpty(String value, String name) { + notNull(value, name); + + if (value.trim().length() == 0) { + throw new IllegalArgumentException(INVALID_ARG_MSG_PREFIX + name + NOT_NULL_OR_EMPTY_SUFFIX); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0d701127/modules/jms11/pom.xml ---------------------------------------------------------------------- diff --git a/modules/jms11/pom.xml b/modules/jms11/pom.xml new file mode 100644 index 0000000..c93ea9a --- /dev/null +++ b/modules/jms11/pom.xml @@ -0,0 +1,98 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + POM file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-parent</artifactId> + <version>1</version> + <relativePath>../../parent</relativePath> + </parent> + + <artifactId>ignite-jms11</artifactId> + <version>1.4.1-SNAPSHOT</version> + + <properties> + <activemq.version>5.11.1</activemq.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>javax.jms</groupId> + <artifactId>jms-api</artifactId> + <version>1.1-rev-1</version> + </dependency> + + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-client</artifactId> + <version>${activemq.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-broker</artifactId> + <version>${activemq.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-kahadb-store</artifactId> + <version>${activemq.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-log4j</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-spring</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0d701127/modules/jms11/src/main/java/org/apache/ignite/stream/jms11/JmsStreamer.java ---------------------------------------------------------------------- diff --git a/modules/jms11/src/main/java/org/apache/ignite/stream/jms11/JmsStreamer.java b/modules/jms11/src/main/java/org/apache/ignite/stream/jms11/JmsStreamer.java new file mode 100644 index 0000000..de84a3e --- /dev/null +++ b/modules/jms11/src/main/java/org/apache/ignite/stream/jms11/JmsStreamer.java @@ -0,0 +1,524 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.jms11; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.stream.*; + +import javax.jms.*; +import javax.jms.Queue; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Streamer that consumes from a JMS destination and feeds key-value pairs into an {@link IgniteDataStreamer} instance. + * <p> + * This Streamer uses purely JMS semantics and it is not coupled with any JMS implementation. It uses {@link + * MessageListener} to receive messages. You must provide your broker's {@link javax.jms.ConnectionFactory} when + * creating a {@link JmsStreamer}. + * <p> + * You must also provide a {@link MessageTransformer} to convert the incoming message into cache entries. + * <p> + * This Streamer has many features: + * + * <ul> + * <li>Consumes from queues or topics.</li> + * <li>For topics, it supports durable subscriptions.</li> + * <li>Concurrent consumers are supported via the <tt>threads</tt> parameter. When consuming from queues, + * this component will start as many {@link Session} objects with separate {@link MessageListener} instances each, + * therefore achieving <i>native</i> concurrency (in terms of the JMS standard).<br> + * When consuming from topics, obviously we cannot start multiple threads as that would lead us to consume + * duplicate messages. Therefore, we achieve concurrency in a <i>virtualized</i> manner through an internal + * thread pool.</li> + * <li>Transacted sessions are supported through the <tt>transacted</tt> parameter.</li> + * <li>Batched consumption is possible via the <tt>batched</tt> parameter. Depending on the broker, this + * technique can provide a higher throughput as it decreases the amount of message acknowledgement round trips + * that are necessary, albeit at the expense possible duplicate messages (especially if an incident + * occurs in the middle of a transaction).<br> + * Batches are committed when the <tt>batchClosureMillis</tt> time has elapsed, or when a Session has received + * at least <tt>batchClosureSize</tt> messages. Time-based closure fires with the specified frequency and applies to + * all {@link Session}s in parallel. Size-based closure applies individually to each <tt>Session</tt> (as transactions + * are <tt>Session-bound</tt> in JMS, so it will fire when that {@link Session} has processed that many messages. + * Both options are compatible with each other, or you can disable either (see setter documentation), + * but not both.</li> + * <li>Can specify the destination with implementation-specific {@link Destination} objects or with names.</li> + * </ul> + + * + * @author Raul Kripalani + */ +public class JmsStreamer<T extends Message, K, V> extends StreamAdapter<T, K, V> { + + /** Logger. */ + private IgniteLogger log; + + /** + * <i>Compulsory.</i> The message transformer that converts an incoming JMS {@link Message} (or subclass) into one + * or multiple cache entries. + */ + private MessageTransformer<T, K, V> transformer; + + /** The JMS {@link ConnectionFactory} to use. */ + private ConnectionFactory connectionFactory; + + /** Whether to register or not as a durable subscription (for topic consumption). */ + private boolean durableSubscription; + + /** Name of the durable subscription, as required by the JMS specification. */ + private String durableSubscriptionName; + + /** Client ID in case we're using durable subscribers. */ + private String clientId; + + /** The JMS {@link Destination}; takes precedence over destinationName if both are set. */ + private Destination destination; + + /** Name of the destination. */ + private String destinationName; + + /** Whether to consume in a transacted manner. */ + private boolean transacted; + + /** Whether to consume messages in batches. May lead to duplicate consumption. Value <tt>true</tt> implies + * <tt>transacted = true</tt>. */ + private boolean batched; + + /** When using batched consumers, the amount of messages after the batch (transaction) will be committed. */ + private int batchClosureSize = 50; + + /** + * When using batched consumers, the amount of time to wait before the batch (transaction) will be committed. A + * value of 0 or -1 disables timed-based session commits. + */ + private long batchClosureMillis = 1000; + + /** Destination type. */ + private Class<? extends Destination> destinationType = Queue.class; + + /** + * Number of threads to concurrently consume JMS messages. When working with queues, we will start as many {@link + * javax.jms.Session} objects as indicated by this field, i.e. you will get native concurrency. On the other hand, + * when consuming from a topic, for obvious reason we will only start 1 message consumer but we will distribute the + * processing of received messages to as many concurrent threads as indicated. + */ + private int threads = 1; + + /** Whether we are stopped or not. */ + private volatile boolean stopped = true; + + /** JMS Connection. */ + private Connection connection; + + /** Stores the current JMS Sessions. */ + private Set<Session> sessions = Collections.newSetFromMap(new ConcurrentHashMap<Session, Boolean>()); + + /** Message consumers. */ + private Set<MessageConsumer> consumers = Collections.newSetFromMap(new ConcurrentHashMap<MessageConsumer, Boolean>()); + + /** Message listeners. */ + private Set<IgniteJmsMessageListener> listeners = Collections.newSetFromMap(new ConcurrentHashMap<IgniteJmsMessageListener, Boolean>()); + + /** Scheduler for handling {@link #batchClosureMillis}. */ + private ScheduledExecutorService scheduler; + + /** + * Starts streamer. + * + * @throws IgniteException If failed. + */ + public void start() throws IgniteException { + if (!stopped) + throw new IgniteException("Attempted to start an already started JMS Streamer"); + + try { + A.notNull(getStreamer(), "streamer"); + A.notNull(getIgnite(), "ignite"); + + log = getIgnite().log(); + + A.notNull(transformer, "message transformer"); + A.notNull(connectionFactory, "connection factory"); + A.ensure(threads > 0, "threads > 0"); + + // handle batched && transacted parameter interaction + if (batched && !transacted) { + log.warning("Starting a Batched JMS Streamer without transacted flag = true. Setting it automatically."); + transacted = true; + } + + // handle batch completion criteria + if (batched) { + A.ensure(batchClosureMillis > 0 || batchClosureSize > 0, "at least one of batch closure size or " + + "batch closure frequency must be specified when using batch consumption"); + } + + // check the parameters needed for durable subscriptions, if enabled + if (durableSubscription) { + A.notNullOrEmpty(clientId, "client id is compulsory when using durable subscriptions"); + A.notNullOrEmpty(durableSubscriptionName, "durable subscription name is compulsory when using " + + "durable subscriptions"); + } + + // validate the destination; if we have an explicit destination, make sure it's of type Queue or Topic; + // else make sure that the destinationName and the destinationType are valid + if (destination == null) { + A.notNull(destinationType, "destination type"); + A.ensure(destinationType.isAssignableFrom(Queue.class) || destinationType.isAssignableFrom(Topic.class), + "this streamer can only handle Queues or Topics."); + A.notNullOrEmpty(destinationName, "destination or destination name"); + } + else if (destination instanceof Queue) { + destinationType = Queue.class; + } + else if (destination instanceof Topic) { + destinationType = Topic.class; + } + else { + throw new IllegalArgumentException("Invalid destination object. Can only handle Queues or Topics."); + } + + // create a new connection and the client iD if relevant. + connection = connectionFactory.createConnection(); + if (clientId != null && clientId.trim().length() > 0) { + connection.setClientID(clientId.trim()); + } + + // build the JMS objects + if (destinationType == Queue.class) { + initializeJmsObjectsForQueue(); + } + else { + initializeJmsObjectsForTopic(); + } + + stopped = false; + + // start the JMS connection + connection.start(); + + // set up the scheduler service for committing batches + if (batched && batchClosureMillis > 0) { + scheduler = Executors.newScheduledThreadPool(1); + scheduler.schedule(new Runnable() { + @Override + public void run() { + for (Session session : sessions) { + try { + session.commit(); + if (log.isDebugEnabled()) { + log.debug("Committing session from time-based batch completion [session=" + + session + "]"); + } + } + catch (JMSException e) { + log.warning("Error while committing session: from batch time-based completion " + + "[session=" + session + "]"); + } + } + for (IgniteJmsMessageListener ml : listeners) { + ml.resetBatchCounter(); + } + } + }, batchClosureMillis, TimeUnit.MILLISECONDS); + } + + } + catch (Throwable t) { + throw new IgniteException("Exception while initializing JmsStreamer", t); + } + + } + + /** + * Stops streamer. + */ + public void stop() throws IgniteException { + if (stopped) + throw new IgniteException("Attempted to stop an already stopped JMS Streamer"); + + + try { + stopped = true; + + if (scheduler != null && !scheduler.isShutdown()) { + scheduler.shutdown(); + scheduler = null; + } + + connection.stop(); + connection.close(); + + for (Session s : sessions) { + s.close(); + } + + sessions.clear(); + consumers.clear(); + listeners.clear(); + } + catch (Throwable t) { + throw new IgniteException("Exception while stopping JmsStreamer", t); + } + } + + /** + * Sets the JMS {@link ConnectionFactory}. + * + * @param connectionFactory JMS {@link ConnectionFactory} for this streamer to use. + */ + public void setConnectionFactory(ConnectionFactory connectionFactory) { + this.connectionFactory = connectionFactory; + } + + /** + * <i>Compulsory.</i> The {@link MessageTransformer} that converts an incoming JMS {@link Message} (or subclass) + * into one or multiple cache entries. + * + * @param transformer The implementation of the MessageTransformer to use. + */ + public void setTransformer(MessageTransformer<T, K, V> transformer) { + this.transformer = transformer; + } + + /** + * Sets the JMS {@link Destination} explicitly. Takes precedence over destinationName if both are set. + * + * @param destination JMS {@link Destination} if setting it explicitly. + */ + public void setDestination(Destination destination) { + this.destination = destination; + } + + /** + * Sets the name of the JMS destination to consume from. + * + * @param destinationName The name of the destination; will be passed on directly to the broker. + */ + public void setDestinationName(String destinationName) { + this.destinationName = destinationName; + } + + /** + * Sets the type of the destination to create, when used in combination with {@link #setDestinationName(String)}. It + * can be an interface or the implementation class specific to the broker. + * + * @param destinationType The class representing the destination type. Suggested values: {@link Queue} or {@link + * Topic}. <i>Compulsory</i> if using {@link #destinationName}. + * @see Queue + * @see Topic + */ + public void setDestinationType(Class<? extends Destination> destinationType) { + this.destinationType = destinationType; + } + + /** + * Sets the number of threads to concurrently consume JMS messages. <p> When working with queues, we will start as + * many {@link javax.jms.Session} objects as indicated by this field, i.e. you will get native concurrency. <p> On + * the other hand, when consuming from a topic, for obvious reason we will only start 1 message consumer but we will + * distribute the processing of received messages to as many concurrent threads as indicated. + * + * @param threads Number of threads to use. Default: <tt>1</tt>. + */ + public void setThreads(int threads) { + this.threads = threads; + } + + /** + * Sets the client ID of the JMS {@link Connection}. + * + * @param clientId Client ID in case we're using durable subscribers. Default: none. + */ + public void setClientId(String clientId) { + this.clientId = clientId; + } + + /** + * A <tt>true</tt> value is only accepted in combination with topic consumption. + * + * @param durableSubscription Whether or not to use durable subscriptions. Default: <tt>false</tt>. + */ + public void setDurableSubscription(boolean durableSubscription) { + this.durableSubscription = durableSubscription; + } + + /** + * Instructs the streamer whether to use local JMS transactions or not. + * + * @param transacted Whether to consume or not in a transacted manner. Default: <tt>false</tt>. + */ + public void setTransacted(boolean transacted) { + this.transacted = transacted; + } + + /** + * Batch consumption leverages JMS Transactions to minimise round trips to the broker. <p> Rather than ACKing every + * single message received, they will be received in the context of a JMS transaction which will be committed once + * the indicated batch closure size or batch closure time has elapsed. <p> Warning: May lead to duplicate + * consumption. + * + * @param batched Whether to consume messages in batches. Value <tt>true</tt> implies <tt>transacted = true</tt>. + * Default: <tt>false</tt>. + * @see #setBatchClosureMillis(long) + * @see #setBatchClosureSize(int) + */ + public void setBatched(boolean batched) { + this.batched = batched; + } + + /** + * When using batched consumption, sets the amount of messages that will be received before a batch is committed. + * + * @param batchClosureSize The amount of messages processed before a batch is committed. Default: <tt>50</tt>. + */ + public void setBatchClosureSize(int batchClosureSize) { + this.batchClosureSize = batchClosureSize; + } + + /** + * When using batched consumption, sets the time in milliseconds that will elapse before a batch is committed. + * + * @param batchClosureMillis Milliseconds before a batch is committed. Default: <tt>1000ms</tt>. + */ + public void setBatchClosureMillis(long batchClosureMillis) { + this.batchClosureMillis = batchClosureMillis; + } + + /** + * When using Durable Subscribers, sets the name of the durable subscriber. It is compulsory. + * + * @param durableSubscriptionName Name of the durable subscriber. Default: none. + */ + public void setDurableSubscriptionName(String durableSubscriptionName) { + this.durableSubscriptionName = durableSubscriptionName; + } + + private void initializeJmsObjectsForTopic() throws JMSException { + Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + Topic topic = (Topic)destination; + + if (destination == null) + topic = session.createTopic(destinationName); + + MessageConsumer consumer = durableSubscription ? session.createDurableSubscriber(topic, durableSubscriptionName) : + session.createConsumer(topic); + + IgniteJmsMessageListener messageListener = new IgniteJmsMessageListener(session, true); + consumer.setMessageListener(messageListener); + + consumers.add(consumer); + sessions.add(session); + listeners.add(messageListener); + } + + private void initializeJmsObjectsForQueue() throws JMSException { + for (int i = 0; i < threads; i++) { + Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + + if (destination == null) + destination = session.createQueue(destinationName); + + MessageConsumer consumer = session.createConsumer(destination); + + IgniteJmsMessageListener messageListener = new IgniteJmsMessageListener(session, false); + consumer.setMessageListener(messageListener); + + consumers.add(consumer); + sessions.add(session); + listeners.add(messageListener); + } + } + + private void processMessage(T message) { + final IgniteDataStreamer<K, V> streamer = getStreamer(); + + Map<K, V> entries = transformer.apply(message); + + if (entries == null || entries.size() == 0) + return; + + streamer.addData(entries); + } + + /** + * Message listener for queues. + */ + private class IgniteJmsMessageListener implements MessageListener { + + private Session session; + private AtomicInteger counter = new AtomicInteger(0); + private Executor executor; + + public IgniteJmsMessageListener(Session session, boolean createThreadPool) { + this.session = session; + + // if we don't need a thread pool, create a dummy one that executes the task synchronously + //noinspection NullableProblems + this.executor = createThreadPool ? Executors.newFixedThreadPool(threads) : new Executor() { + @Override + public void execute(Runnable command) { + command.run(); + } + }; + } + + @Override + public void onMessage(final Message message) { + if (stopped) { + return; + } + + executor.execute(new Runnable() { + @Override @SuppressWarnings("unchecked") + public void run() { + processMessage((T)message); + if (batched) { + // batch completion may be handled by timer only + if (batchClosureSize <= 0) + return; + + else if (counter.incrementAndGet() >= batchClosureSize) { + try { + session.commit(); + counter.set(0); + } + catch (Exception e) { + log.warning("Could not commit JMS session upon completion of batch.", e); + } + } + } + else if (transacted) { + try { + session.commit(); + } + catch (JMSException e) { + log.warning("Could not commit JMS session (non-batched).", e); + } + } + } + }); + + } + + public void resetBatchCounter() { + counter.set(0); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0d701127/modules/jms11/src/main/java/org/apache/ignite/stream/jms11/MessageTransformer.java ---------------------------------------------------------------------- diff --git a/modules/jms11/src/main/java/org/apache/ignite/stream/jms11/MessageTransformer.java b/modules/jms11/src/main/java/org/apache/ignite/stream/jms11/MessageTransformer.java new file mode 100644 index 0000000..3cb0231 --- /dev/null +++ b/modules/jms11/src/main/java/org/apache/ignite/stream/jms11/MessageTransformer.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.jms11; + +import org.jetbrains.annotations.*; + +import javax.jms.*; +import java.util.*; + +/** + * Implement this interface to transform from a {@link Message} to a set of cache entries in the form of a {@link Map}. + * + * @param <T> The type of JMS Message. + * @param <K> The type of the cache key. + * @param <V> The type of the cache value. + * @author Raul Kripalani + */ +public interface MessageTransformer<T extends Message, K, V> { + + /** + * Transformation function. + * + * @param message The message received from the JMS broker. + * @return Set of cache entries to add to the cache. It could be empty or null if the message should be skipped. + */ + @Nullable Map<K, V> apply(T message); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0d701127/modules/jms11/src/test/java/org/apache/ignite/stream/jms11/IgniteJmsStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/jms11/src/test/java/org/apache/ignite/stream/jms11/IgniteJmsStreamerTest.java b/modules/jms11/src/test/java/org/apache/ignite/stream/jms11/IgniteJmsStreamerTest.java new file mode 100644 index 0000000..f1c107d --- /dev/null +++ b/modules/jms11/src/test/java/org/apache/ignite/stream/jms11/IgniteJmsStreamerTest.java @@ -0,0 +1,551 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.jms11; + +import org.apache.activemq.*; +import org.apache.activemq.broker.*; +import org.apache.activemq.broker.region.*; +import org.apache.activemq.broker.region.policy.*; +import org.apache.activemq.command.*; +import org.apache.ignite.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.testframework.junits.common.*; + +import org.junit.*; + +import javax.jms.*; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.Queue; +import javax.jms.Topic; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.events.EventType.*; + +/** + * Test for {@link JmsStreamer}. Tests both queues and topics. + * + * @author Raul Kripalani + */ +public class IgniteJmsStreamerTest extends GridCommonAbstractTest { + + private static final int CACHE_ENTRY_COUNT = 100; + private static final String QUEUE_NAME = "ignite.test.queue"; + private static final String TOPIC_NAME = "ignite.test.topic"; + private static final Map<String, String> TEST_DATA = new HashMap<>(); + + static { + for (int i = 1; i <= CACHE_ENTRY_COUNT; i++) + TEST_DATA.put(Integer.toString(i), "v" + i); + } + + private BrokerService broker; + private ConnectionFactory connectionFactory; + + /** Constructor. */ + public IgniteJmsStreamerTest() { + super(true); + } + + @Before @SuppressWarnings("unchecked") + public void beforeTest() throws Exception { + grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration()); + + broker = new BrokerService(); + broker.deleteAllMessages(); + broker.setPersistent(false); + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policy = new PolicyEntry(); + policy.setQueuePrefetch(1); + broker.setDestinationPolicy(policyMap); + broker.getDestinationPolicy().setDefaultEntry(policy); + + broker.start(true); + + connectionFactory = new ActiveMQConnectionFactory(BrokerRegistry.getInstance().findFirst().getVmConnectorURI()); + + } + + @After + public void afterTest() throws Exception { + grid().cache(null).clear(); + + broker.deleteAllMessages(); + broker.stop(); + } + + public void testQueueFromName() throws Exception { + Destination destination = new ActiveMQQueue(QUEUE_NAME); + + // produce messages into the queue + produceObjectMessages(destination, false); + + try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(null)) { + JmsStreamer<ObjectMessage, String, String> jmsStreamer = newJmsStreamer(ObjectMessage.class, dataStreamer); + jmsStreamer.setDestinationType(Queue.class); + jmsStreamer.setDestinationName(QUEUE_NAME); + + // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT + CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); + + jmsStreamer.start(); + + // all cache PUT events received in 10 seconds + latch.await(10, TimeUnit.SECONDS); + + assertAllCacheEntriesLoaded(); + + jmsStreamer.stop(); + } + + } + + public void testTopicFromName() throws JMSException, InterruptedException { + Destination destination = new ActiveMQTopic(TOPIC_NAME); + + // should not produced messages until subscribed to the topic; otherwise they will be missed because this is not + // a durable subscriber (for which a dedicated test exists) + + try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(null)) { + JmsStreamer<ObjectMessage, String, String> jmsStreamer = newJmsStreamer(ObjectMessage.class, dataStreamer); + jmsStreamer.setDestinationType(Topic.class); + jmsStreamer.setDestinationName(TOPIC_NAME); + + // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT + CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); + + jmsStreamer.start(); + + // produce messages + produceObjectMessages(destination, false); + + // all cache PUT events received in 10 seconds + latch.await(10, TimeUnit.SECONDS); + + assertAllCacheEntriesLoaded(); + + jmsStreamer.stop(); + } + + } + + public void testQueueFromExplicitDestination() throws Exception { + Destination destination = new ActiveMQQueue(QUEUE_NAME); + + // produce messages into the queue + produceObjectMessages(destination, false); + + try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(null)) { + JmsStreamer<ObjectMessage, String, String> jmsStreamer = newJmsStreamer(ObjectMessage.class, dataStreamer); + jmsStreamer.setDestination(destination); + + // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT + CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); + + // start the streamer + jmsStreamer.start(); + + // all cache PUT events received in 10 seconds + latch.await(10, TimeUnit.SECONDS); + + assertAllCacheEntriesLoaded(); + + jmsStreamer.stop(); + } + + } + + public void testTopicFromExplicitDestination() throws JMSException, InterruptedException { + Destination destination = new ActiveMQTopic(TOPIC_NAME); + + // should not produced messages until subscribed to the topic; otherwise they will be missed because this is not + // a durable subscriber (for which a dedicated test exists) + + try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(null)) { + JmsStreamer<ObjectMessage, String, String> jmsStreamer = newJmsStreamer(ObjectMessage.class, dataStreamer); + jmsStreamer.setDestination(destination); + + // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT + CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); + + jmsStreamer.start(); + + // produce messages + produceObjectMessages(destination, false); + + // all cache PUT events received in 10 seconds + latch.await(10, TimeUnit.SECONDS); + + assertAllCacheEntriesLoaded(); + + jmsStreamer.stop(); + } + + } + + public void testInsertMultipleCacheEntriesFromOneMessage() throws Exception { + Destination destination = new ActiveMQQueue(QUEUE_NAME); + + // produce A SINGLE MESSAGE, containing all data, into the queue + produceStringMessages(destination, true); + + try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(null)) { + JmsStreamer<TextMessage, String, String> jmsStreamer = newJmsStreamer(TextMessage.class, dataStreamer); + jmsStreamer.setDestination(destination); + + // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT + CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); + + jmsStreamer.start(); + + // all cache PUT events received in 10 seconds + latch.await(10, TimeUnit.SECONDS); + + assertAllCacheEntriesLoaded(); + + jmsStreamer.stop(); + } + + } + + public void testDurableSubscriberStartStopStart() throws Exception { + Destination destination = new ActiveMQTopic(TOPIC_NAME); + + try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(null)) { + JmsStreamer<TextMessage, String, String> jmsStreamer = newJmsStreamer(TextMessage.class, dataStreamer); + jmsStreamer.setDestination(destination); + jmsStreamer.setDurableSubscription(true); + jmsStreamer.setClientId(Long.toString(System.currentTimeMillis())); + jmsStreamer.setDurableSubscriptionName("ignite-test-durable"); + + // we start the streamer so that the durable subscriber registers itself + jmsStreamer.start(); + + // we stop it immediately + jmsStreamer.stop(); + + // we assert that there are no clients of the broker (to make sure we disconnected properly) + assertEquals(0, broker.getCurrentConnections()); + + // we send messages while we're still away + produceStringMessages(destination, false); + + // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT + CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); + + jmsStreamer.start(); + + // all cache PUT events received in 10 seconds + latch.await(10, TimeUnit.SECONDS); + + assertAllCacheEntriesLoaded(); + + jmsStreamer.stop(); + } + + } + + public void testQueueMessagesConsumedInBatchesCompletionSizeBased() throws Exception { + Destination destination = new ActiveMQQueue(QUEUE_NAME); + + // produce multiple messages into the queue + produceStringMessages(destination, false); + + try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(null)) { + JmsStreamer<TextMessage, String, String> jmsStreamer = newJmsStreamer(TextMessage.class, dataStreamer); + jmsStreamer.setDestination(destination); + jmsStreamer.setBatched(true); + jmsStreamer.setBatchClosureSize(99); + + // disable time-based session commits + jmsStreamer.setBatchClosureMillis(0); + + // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT + CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); + + jmsStreamer.start(); + + // all cache PUT events received in 10 seconds + latch.await(10, TimeUnit.SECONDS); + + assertAllCacheEntriesLoaded(); + + // we expect all entries to be loaded, but still one (uncommitted) message should remain in the queue + // as observed by the broker + DestinationStatistics qStats = broker.getBroker().getDestinationMap().get(destination).getDestinationStatistics(); + assertEquals(1, qStats.getMessages().getCount()); + assertEquals(1, qStats.getInflight().getCount()); + + jmsStreamer.stop(); + } + + } + + public void testQueueMessagesConsumedInBatchesCompletionTimeBased() throws Exception { + Destination destination = new ActiveMQQueue(QUEUE_NAME); + + // produce multiple messages into the queue + produceStringMessages(destination, false); + + try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(null)) { + JmsStreamer<TextMessage, String, String> jmsStreamer = newJmsStreamer(TextMessage.class, dataStreamer); + jmsStreamer.setDestination(destination); + jmsStreamer.setBatched(true); + jmsStreamer.setBatchClosureMillis(2000); + // disable size-based session commits + jmsStreamer.setBatchClosureSize(0); + + // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT + CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); + DestinationStatistics qStats = broker.getBroker().getDestinationMap().get(destination).getDestinationStatistics(); + + jmsStreamer.start(); + + // all messages are still inflight + assertEquals(CACHE_ENTRY_COUNT, qStats.getMessages().getCount()); + assertEquals(0, qStats.getDequeues().getCount()); + + // wait a little bit + Thread.sleep(100); + + // all messages are still inflight + assertEquals(CACHE_ENTRY_COUNT, qStats.getMessages().getCount()); + assertEquals(0, qStats.getDequeues().getCount()); + + // now let the scheduler execute + Thread.sleep(2100); + + // all messages are committed + assertEquals(0, qStats.getMessages().getCount()); + assertEquals(CACHE_ENTRY_COUNT, qStats.getDequeues().getCount()); + + latch.await(5, TimeUnit.SECONDS); + + assertAllCacheEntriesLoaded(); + + jmsStreamer.stop(); + } + + } + + public void testGenerateNoEntries() throws Exception { + Destination destination = new ActiveMQQueue(QUEUE_NAME); + + // produce multiple messages into the queue + produceStringMessages(destination, false); + + try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(null)) { + JmsStreamer<TextMessage, String, String> jmsStreamer = newJmsStreamer(TextMessage.class, dataStreamer); + // override the transformer with one that generates no cache entries + jmsStreamer.setTransformer(TestTransformers.generateNoEntries()); + jmsStreamer.setDestination(destination); + + // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT + CountDownLatch latch = subscribeToPutEvents(1); + + jmsStreamer.start(); + + // no cache PUT events were received in 3 seconds, i.e. CountDownLatch does not fire + assertFalse(latch.await(3, TimeUnit.SECONDS)); + + jmsStreamer.stop(); + } + + } + + public void testTransactedSessionNoBatching() throws Exception { + Destination destination = new ActiveMQQueue(QUEUE_NAME); + + // produce multiple messages into the queue + produceStringMessages(destination, false); + + try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(null)) { + JmsStreamer<TextMessage, String, String> jmsStreamer = newJmsStreamer(TextMessage.class, dataStreamer); + jmsStreamer.setTransacted(true); + jmsStreamer.setDestination(destination); + + // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT + CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); + + jmsStreamer.start(); + + // all cache PUT events received in 10 seconds + latch.await(10, TimeUnit.SECONDS); + + assertAllCacheEntriesLoaded(); + + jmsStreamer.stop(); + } + + } + + public void testQueueMultipleThreads() throws Exception { + Destination destination = new ActiveMQQueue(QUEUE_NAME); + + // produce messages into the queue + produceObjectMessages(destination, false); + + try (IgniteDataStreamer<String, String> dataStreamer = grid().dataStreamer(null)) { + JmsStreamer<ObjectMessage, String, String> jmsStreamer = newJmsStreamer(ObjectMessage.class, dataStreamer); + jmsStreamer.setDestination(destination); + jmsStreamer.setThreads(5); + + // subscribe to cache PUT events and return a countdown latch starting at CACHE_ENTRY_COUNT + CountDownLatch latch = subscribeToPutEvents(CACHE_ENTRY_COUNT); + + // start the streamer + jmsStreamer.start(); + + DestinationStatistics qStats = broker.getBroker().getDestinationMap().get(destination).getDestinationStatistics(); + assertEquals(5, qStats.getConsumers().getCount()); + + // all cache PUT events received in 10 seconds + latch.await(10, TimeUnit.SECONDS); + + // assert that all consumers received messages - given that the prefetch is 1 + for (Subscription subscription : broker.getBroker().getDestinationMap().get(destination).getConsumers()) + assertTrue(subscription.getDequeueCounter() > 0); + + assertAllCacheEntriesLoaded(); + + jmsStreamer.stop(); + } + + } + + private void assertAllCacheEntriesLoaded() { + // Get the cache and check that the entries are present + IgniteCache<String, String> cache = grid().cache(null); + for (Map.Entry<String, String> entry : TEST_DATA.entrySet()) + assertEquals(entry.getValue(), cache.get(entry.getKey())); + } + + @SuppressWarnings("unchecked") + private <T extends Message> JmsStreamer<T, String, String> newJmsStreamer(Class<T> type, + IgniteDataStreamer<String, String> dataStreamer) { + + JmsStreamer<T, String, String> jmsStreamer = new JmsStreamer<>(); + jmsStreamer.setIgnite(grid()); + jmsStreamer.setStreamer(dataStreamer); + jmsStreamer.setConnectionFactory(connectionFactory); + + if (type == ObjectMessage.class) { + jmsStreamer.setTransformer((MessageTransformer<T, String, String>) TestTransformers.forObjectMessage()); + } + else { + jmsStreamer.setTransformer((MessageTransformer<T, String, String>) TestTransformers.forTextMessage()); + } + + dataStreamer.allowOverwrite(true); + dataStreamer.autoFlushFrequency(10); + return jmsStreamer; + } + + private CountDownLatch subscribeToPutEvents(int expect) { + Ignite ignite = grid(); + + // Listen to cache PUT events and expect as many as messages as test data items + final CountDownLatch latch = new CountDownLatch(expect); + @SuppressWarnings("serial") IgniteBiPredicate<UUID, CacheEvent> callback = new IgniteBiPredicate<UUID, CacheEvent>() { + @Override public boolean apply(UUID uuid, CacheEvent evt) { + latch.countDown(); + return true; + } + }; + + ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(callback, null, EVT_CACHE_OBJECT_PUT); + return latch; + } + + private void produceObjectMessages(Destination destination, boolean singleMessage) throws JMSException { + Session session = connectionFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer mp = session.createProducer(destination); + HashSet<TestTransformers.TestObject> set = new HashSet<>(); + + for (String key : TEST_DATA.keySet()) { + TestTransformers.TestObject to = new TestTransformers.TestObject(key, TEST_DATA.get(key)); + set.add(to); + } + + int messagesSent; + if (singleMessage) { + mp.send(session.createObjectMessage(set)); + messagesSent = 1; + } + else { + for (TestTransformers.TestObject to : set) + mp.send(session.createObjectMessage(to)); + + messagesSent = set.size(); + } + + if (destination instanceof Queue) { + try { + assertEquals(messagesSent, broker.getBroker().getDestinationMap().get(destination) + .getDestinationStatistics().getMessages().getCount()); + } + catch (Exception e) { + fail(e.toString()); + } + } + + } + + private void produceStringMessages(Destination destination, boolean singleMessage) throws JMSException { + Session session = connectionFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer mp = session.createProducer(destination); + HashSet<String> set = new HashSet<>(); + + for (String key : TEST_DATA.keySet()) + set.add(key + "," + TEST_DATA.get(key)); + + int messagesSent; + if (singleMessage) { + StringBuilder sb = new StringBuilder(); + + for (String s : set) + sb.append(s).append("|"); + + sb.deleteCharAt(sb.length() - 1); + mp.send(session.createTextMessage(sb.toString())); + messagesSent = 1; + + } + else { + for (String s : set) { + mp.send(session.createTextMessage(s)); + } + messagesSent = set.size(); + } + + if (destination instanceof Queue) { + try { + assertEquals(messagesSent, broker.getBroker().getDestinationMap().get(destination) + .getDestinationStatistics().getMessages().getCount()); + } + catch (Exception e) { + fail(e.toString()); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0d701127/modules/jms11/src/test/java/org/apache/ignite/stream/jms11/IgniteJmsStreamerTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/jms11/src/test/java/org/apache/ignite/stream/jms11/IgniteJmsStreamerTestSuite.java b/modules/jms11/src/test/java/org/apache/ignite/stream/jms11/IgniteJmsStreamerTestSuite.java new file mode 100644 index 0000000..e299f04 --- /dev/null +++ b/modules/jms11/src/test/java/org/apache/ignite/stream/jms11/IgniteJmsStreamerTestSuite.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.jms11; + +import org.junit.runner.*; +import org.junit.runners.*; + +/** + * JMS streamer tests. + * + * @author Raul Kripalani + */ +@RunWith(Suite.class) +@Suite.SuiteClasses({ + IgniteJmsStreamerTest.class +}) +public class IgniteJmsStreamerTestSuite { + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0d701127/modules/jms11/src/test/java/org/apache/ignite/stream/jms11/TestTransformers.java ---------------------------------------------------------------------- diff --git a/modules/jms11/src/test/java/org/apache/ignite/stream/jms11/TestTransformers.java b/modules/jms11/src/test/java/org/apache/ignite/stream/jms11/TestTransformers.java new file mode 100644 index 0000000..ce001a1 --- /dev/null +++ b/modules/jms11/src/test/java/org/apache/ignite/stream/jms11/TestTransformers.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.jms11; + +import javax.jms.*; +import java.io.*; +import java.util.*; + +/** + * Test transformers for JmsStreamer tests. + * + * @author Raul Kripalani + */ +public class TestTransformers { + + /** + * Returns a transformer for JMS {@link TextMessage}s, capable of extracting many tuples from a single message, + * if pipe characters are encountered. + * + * @return + */ + public static MessageTransformer<TextMessage, String, String> forTextMessage() { + return new MessageTransformer<TextMessage, String, String>() { + @Override + public Map<String, String> apply(TextMessage message) { + final Map<String, String> answer = new HashMap<>(); + String text; + try { + text = message.getText(); + } + catch (JMSException e) { + e.printStackTrace(); + return Collections.emptyMap(); + } + for (String s : text.split("\\|")) { + String[] tokens = s.split(","); + answer.put(tokens[0], tokens[1]); + } + return answer; + } + }; + } + + /** + * Returns a transformer for JMS {@link ObjectMessage}s, capable of extracting many tuples from a single message, + * if the payload is a {@link Collection}. + * + * @return + */ + public static MessageTransformer<ObjectMessage, String, String> forObjectMessage() { + return new MessageTransformer<ObjectMessage, String, String>() { + @Override @SuppressWarnings("unchecked") + public Map<String, String> apply(ObjectMessage message) { + Object object; + try { + object = message.getObject(); + } + catch (JMSException e) { + e.printStackTrace(); + return Collections.emptyMap(); + } + + final Map<String, String> answer = new HashMap<>(); + if (object instanceof Collection) { + for (TestObject to : (Collection<TestObject>)object) + answer.put(to.getKey(), to.getValue()); + + } + else if (object instanceof TestObject) { + TestObject to = (TestObject)object; + answer.put(to.getKey(), to.getValue()); + } + return answer; + } + }; + } + + public static MessageTransformer<TextMessage, String, String> generateNoEntries() { + return new MessageTransformer<TextMessage, String, String>() { + @Override + public Map<String, String> apply(TextMessage message) { + return null; + } + }; + } + + public static class TestObject implements Serializable { + private static final long serialVersionUID = -7332027566186690945L; + + private String key; + private String value; + + public TestObject(String key, String value) { + this.key = key; + this.value = value; + } + + public String getKey() { + return key; + } + + public String getValue() { + return value; + } + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0d701127/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 4d53d96..f3a5d65 100644 --- a/pom.xml +++ b/pom.xml @@ -74,6 +74,7 @@ <module>modules/mesos</module> <module>modules/kafka</module> <module>modules/yarn</module> + <module>modules/jms11</module> </modules> <profiles>