ignite-428 Review fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/63fce5a4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/63fce5a4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/63fce5a4 Branch: refs/heads/ignite-1056 Commit: 63fce5a4920531ad8cd9afc47d829bb2fa4bc438 Parents: 2c41739 Author: agura <ag...@gridgain.com> Authored: Thu Jun 25 22:18:00 2015 +0300 Committer: agura <ag...@gridgain.com> Committed: Fri Jul 3 19:39:12 2015 +0300 ---------------------------------------------------------------------- modules/kafka/pom.xml | 18 +- .../ignite/stream/kafka/KafkaStreamer.java | 121 ++++++--- .../stream/kafka/KafkaEmbeddedBroker.java | 251 ++++++++++--------- .../kafka/KafkaIgniteStreamerSelfTest.java | 131 +++++----- .../ignite/stream/kafka/SimplePartitioner.java | 27 +- 5 files changed, 293 insertions(+), 255 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/63fce5a4/modules/kafka/pom.xml ---------------------------------------------------------------------- diff --git a/modules/kafka/pom.xml b/modules/kafka/pom.xml index 165ec1c..43909bc 100644 --- a/modules/kafka/pom.xml +++ b/modules/kafka/pom.xml @@ -31,7 +31,7 @@ </parent> <artifactId>ignite-kafka</artifactId> - <version>1.1.1-SNAPSHOT</version> + <version>1.1.6-SNAPSHOT</version> <dependencies> <dependency> @@ -39,6 +39,7 @@ <artifactId>ignite-core</artifactId> <version>${project.version}</version> </dependency> + <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> @@ -66,6 +67,7 @@ </exclusion> </exclusions> </dependency> + <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> @@ -73,12 +75,6 @@ </dependency> <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - <version>2.4</version> - </dependency> - - <dependency> <groupId>org.apache.ignite</groupId> <artifactId>ignite-log4j</artifactId> <version>${project.version}</version> @@ -90,7 +86,6 @@ <version>4.2</version> </dependency> - <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> @@ -103,13 +98,6 @@ </dependency> <dependency> - <groupId>commons-beanutils</groupId> - <artifactId>commons-beanutils</artifactId> - <version>1.8.3</version> - <scope>test</scope> - </dependency> - - <dependency> <groupId>org.apache.ignite</groupId> <artifactId>ignite-spring</artifactId> <version>${project.version}</version> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/63fce5a4/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java index e0240ce..e9ad0bd 100644 --- a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java +++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java @@ -30,16 +30,17 @@ import java.util.*; import java.util.concurrent.*; /** - * Server that subscribes to topic messages from Kafka broker, streams its to key-value pairs into {@link - * org.apache.ignite.IgniteDataStreamer} instance. + * Server that subscribes to topic messages from Kafka broker and streams its to key-value pairs into + * {@link IgniteDataStreamer} instance. * <p> - * Uses Kafka's High Level Consumer API to read messages from Kafka + * Uses Kafka's High Level Consumer API to read messages from Kafka. * * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example">Consumer Consumer Group * Example</a> */ -public class KafkaStreamer<T, K, V> - extends StreamAdapter<T, K, V> { +public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> { + /** Retry timeout. */ + private static final long DFLT_RETRY_TIMEOUT = 10000; /** Logger. */ private IgniteLogger log; @@ -53,61 +54,78 @@ public class KafkaStreamer<T, K, V> /** Number of threads to process kafka streams. */ private int threads; - /** Kafka Consumer Config. */ - private ConsumerConfig consumerConfig; + /** Kafka consumer config. */ + private ConsumerConfig consumerCfg; - /** Key Decoder. */ + /** Key decoder. */ private Decoder<K> keyDecoder; - /** Value Decoder. */ - private Decoder<V> valueDecoder; + /** Value decoder. */ + private Decoder<V> valDecoder; - /** Kafka Consumer connector. */ + /** Kafka consumer connector. */ private ConsumerConnector consumer; + /** Retry timeout. */ + private long retryTimeout = DFLT_RETRY_TIMEOUT; + + /** Stopped. */ + private volatile boolean stopped; + /** - * Sets the topic. + * Sets the topic name. * - * @param topic Topic Name. + * @param topic Topic name. */ - public void setTopic(final String topic) { + public void setTopic(String topic) { this.topic = topic; } /** * Sets the threads. * - * @param threads Number of Threads. + * @param threads Number of threads. */ - public void setThreads(final int threads) { + public void setThreads(int threads) { this.threads = threads; } /** * Sets the consumer config. * - * @param consumerConfig Consumer configuration. + * @param consumerCfg Consumer configuration. */ - public void setConsumerConfig(final ConsumerConfig consumerConfig) { - this.consumerConfig = consumerConfig; + public void setConsumerConfig(ConsumerConfig consumerCfg) { + this.consumerCfg = consumerCfg; } /** * Sets the key decoder. * - * @param keyDecoder Key Decoder. + * @param keyDecoder Key decoder. */ - public void setKeyDecoder(final Decoder<K> keyDecoder) { + public void setKeyDecoder(Decoder<K> keyDecoder) { this.keyDecoder = keyDecoder; } /** * Sets the value decoder. * - * @param valueDecoder Value Decoder + * @param valDecoder Value decoder. */ - public void setValueDecoder(final Decoder<V> valueDecoder) { - this.valueDecoder = valueDecoder; + public void setValueDecoder(Decoder<V> valDecoder) { + this.valDecoder = valDecoder; + } + + /** + * Sets the retry timeout. + * + * @param retryTimeout Retry timeout. + */ + public void setRetryTimeout(long retryTimeout) { + A.ensure(retryTimeout > 0, "retryTimeout > 0"); + + this.retryTimeout = retryTimeout; } /** @@ -120,35 +138,56 @@ public class KafkaStreamer<T, K, V> A.notNull(getIgnite(), "ignite"); A.notNull(topic, "topic"); A.notNull(keyDecoder, "key decoder"); - A.notNull(valueDecoder, "value decoder"); - A.notNull(consumerConfig, "kafka consumer config"); + A.notNull(valDecoder, "value decoder"); + A.notNull(consumerCfg, "kafka consumer config"); A.ensure(threads > 0, "threads > 0"); log = getIgnite().log(); - consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); + consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerCfg); + + Map<String, Integer> topicCntMap = new HashMap<>(); - Map<String, Integer> topicCountMap = new HashMap<>(); - topicCountMap.put(topic, threads); + topicCntMap.put(topic, threads); - Map<String, List<KafkaStream<K, V>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, - valueDecoder); + Map<String, List<KafkaStream<K, V>>> consumerMap = + consumer.createMessageStreams(topicCntMap, keyDecoder, valDecoder); List<KafkaStream<K, V>> streams = consumerMap.get(topic); // Now launch all the consumer threads. executor = Executors.newFixedThreadPool(threads); + stopped = false; + // Now create an object to consume the messages. - for (final KafkaStream<K,V> stream : streams) { + for (final KafkaStream<K, V> stream : streams) { executor.submit(new Runnable() { @Override public void run() { - - ConsumerIterator<K, V> it = stream.iterator(); - - while (it.hasNext()) { - final MessageAndMetadata<K, V> messageAndMetadata = it.next(); - getStreamer().addData(messageAndMetadata.key(), messageAndMetadata.message()); + while (!stopped) { + try { + for (ConsumerIterator<K, V> it = stream.iterator(); it.hasNext() && !stopped; ) { + MessageAndMetadata<K, V> msg = it.next(); + + try { + getStreamer().addData(msg.key(), msg.message()); + } + catch (Exception e) { + U.warn(log, "Message is ignored due to an error [msg=" + msg + ']', e); + } + } + } + catch (Exception e) { + U.warn(log, "Message can't be consumed from stream. Retry after " + + retryTimeout + " ms.", e); + + try { + Thread.sleep(retryTimeout); + } + catch (InterruptedException ie) { + // No-op. + } + } } } }); @@ -159,6 +198,8 @@ public class KafkaStreamer<T, K, V> * Stops streamer. */ public void stop() { + stopped = true; + if (consumer != null) consumer.shutdown(); @@ -168,11 +209,11 @@ public class KafkaStreamer<T, K, V> try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) if (log.isDebugEnabled()) - log.debug("Timed out waiting for consumer threads to shut down, exiting uncleanly"); + log.debug("Timed out waiting for consumer threads to shut down, exiting uncleanly."); } catch (InterruptedException e) { if (log.isDebugEnabled()) - log.debug("Interrupted during shutdown, exiting uncleanly"); + log.debug("Interrupted during shutdown, exiting uncleanly."); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/63fce5a4/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java index 28533f7..98b9e4c 100644 --- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java +++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java @@ -17,7 +17,6 @@ package org.apache.ignite.stream.kafka; -import org.apache.commons.io.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.zookeeper.server.*; @@ -25,6 +24,7 @@ import kafka.admin.*; import kafka.api.*; import kafka.api.Request; import kafka.producer.*; +import kafka.serializer.*; import kafka.server.*; import kafka.utils.*; import org.I0Itec.zkclient.*; @@ -39,106 +39,104 @@ import java.util.concurrent.*; * Kafka Embedded Broker. */ public class KafkaEmbeddedBroker { - - /** Default ZooKeeper Host. */ + /** Default ZooKeeper host. */ private static final String ZK_HOST = "localhost"; - /** Broker Port. */ + /** Broker port. */ private static final int BROKER_PORT = 9092; - /** ZooKeeper Connection Timeout. */ + /** ZooKeeper connection timeout. */ private static final int ZK_CONNECTION_TIMEOUT = 6000; - /** ZooKeeper Session Timeout. */ + /** ZooKeeper session timeout. */ private static final int ZK_SESSION_TIMEOUT = 6000; /** ZooKeeper port. */ private static int zkPort = 0; - /** Is ZooKeeper Ready. */ + /** Is ZooKeeper ready. */ private boolean zkReady; - /** Kafka Config. */ - private KafkaConfig brokerConfig; + /** Kafka config. */ + private KafkaConfig brokerCfg; - /** Kafka Server. */ - private KafkaServer kafkaServer; + /** Kafka server. */ + private KafkaServer kafkaSrv; - /** ZooKeeper Client. */ + /** ZooKeeper client. */ private ZkClient zkClient; /** Embedded ZooKeeper. */ private EmbeddedZooKeeper zooKeeper; /** - * Creates an embedded Kafka Broker. + * Creates an embedded Kafka broker. */ public KafkaEmbeddedBroker() { try { setupEmbeddedZooKeeper(); + setupEmbeddedKafkaServer(); } catch (IOException | InterruptedException e) { - throw new RuntimeException("failed to start Kafka Broker " + e); + throw new RuntimeException("Failed to start Kafka broker " + e); } - } /** - * @return ZooKeeper Address. + * @return ZooKeeper address. */ public static String getZKAddress() { - return ZK_HOST + ":" + zkPort; + return ZK_HOST + ':' + zkPort; } /** * Creates a Topic. * - * @param topic topic name - * @param partitions number of paritions for the topic - * @param replicationFactor replication factor - * @throws TimeoutException - * @throws InterruptedException + * @param topic Topic name. + * @param partitions Number of partitions for the topic. + * @param replicationFactor Replication factor. + * @throws TimeoutException If operation is timed out. + * @throws InterruptedException If interrupted. */ - public void createTopic(String topic, final int partitions, final int replicationFactor) + public void createTopic(String topic, int partitions, int replicationFactor) throws TimeoutException, InterruptedException { AdminUtils.createTopic(zkClient, topic, partitions, replicationFactor, new Properties()); + waitUntilMetadataIsPropagated(topic, 0, 10000, 100); } /** - * Sends message to Kafka Broker. + * Sends message to Kafka broker. * - * @param keyedMessages List of Keyed Messages. + * @param keyedMessages List of keyed messages. * @return Producer used to send the message. */ - public Producer sendMessages(List<KeyedMessage<String, String>> keyedMessages) { + public Producer<String, String> sendMessages(List<KeyedMessage<String, String>> keyedMessages) { Producer<String, String> producer = new Producer<>(getProducerConfig()); + producer.send(scala.collection.JavaConversions.asScalaBuffer(keyedMessages)); + return producer; } /** - * Shuts down Kafka Broker. - * - * @throws IOException + * Shuts down Kafka broker. */ - public void shutdown() - throws IOException { - + public void shutdown() { zkReady = false; - if (kafkaServer != null) - kafkaServer.shutdown(); + if (kafkaSrv != null) + kafkaSrv.shutdown(); - List<String> logDirs = scala.collection.JavaConversions.asJavaList(brokerConfig.logDirs()); + List<String> logDirs = scala.collection.JavaConversions.asJavaList(brokerCfg.logDirs()); - for (String logDir : logDirs) { - FileUtils.deleteDirectory(new File(logDir)); - } + for (String logDir : logDirs) + U.delete(new File(logDir)); if (zkClient != null) { zkClient.close(); + zkClient = null; } @@ -148,16 +146,15 @@ public class KafkaEmbeddedBroker { zooKeeper.shutdown(); } catch (IOException e) { - // ignore + // No-op. } zooKeeper = null; } - } /** - * @return the Zookeeper Client + * @return ZooKeeper client. */ private ZkClient getZkClient() { A.ensure(zkReady, "Zookeeper not setup yet"); @@ -169,102 +166,105 @@ public class KafkaEmbeddedBroker { /** * Checks if topic metadata is propagated. * - * @param topic topic name - * @param partition partition - * @return true if propagated otherwise false + * @param topic Topic name. + * @param part Partition. + * @return {@code True} if propagated, otherwise {@code false}. */ - private boolean isMetadataPropagated(final String topic, final int partition) { - final scala.Option<PartitionStateInfo> partitionStateOption = kafkaServer.apis().metadataCache().getPartitionInfo( - topic, partition); - if (partitionStateOption.isDefined()) { - final PartitionStateInfo partitionState = partitionStateOption.get(); - final LeaderAndIsr leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch().leaderAndIsr(); - - if (ZkUtils.getLeaderForPartition(getZkClient(), topic, partition) != null - && Request.isValidBrokerId(leaderAndInSyncReplicas.leader()) - && leaderAndInSyncReplicas.isr().size() >= 1) - return true; + private boolean isMetadataPropagated(String topic, int part) { + scala.Option<PartitionStateInfo> partStateOption = + kafkaSrv.apis().metadataCache().getPartitionInfo(topic, part); - } - return false; + if (!partStateOption.isDefined()) + return false; + + PartitionStateInfo partState = partStateOption.get(); + + LeaderAndIsr LeaderAndIsr = partState.leaderIsrAndControllerEpoch().leaderAndIsr(); + + return ZkUtils.getLeaderForPartition(getZkClient(), topic, part) != null && + Request.isValidBrokerId(LeaderAndIsr.leader()) && LeaderAndIsr.isr().size() >= 1; } /** * Waits until metadata is propagated. * - * @param topic topic name - * @param partition partition - * @param timeout timeout value in millis - * @param interval interval in millis to sleep - * @throws TimeoutException - * @throws InterruptedException + * @param topic Topic name. + * @param part Partition. + * @param timeout Timeout value in millis. + * @param interval Interval in millis to sleep. + * @throws TimeoutException If operation is timed out. + * @throws InterruptedException If interrupted. */ - private void waitUntilMetadataIsPropagated(final String topic, final int partition, final long timeout, - final long interval) throws TimeoutException, InterruptedException { + private void waitUntilMetadataIsPropagated(String topic, int part, long timeout, long interval) + throws TimeoutException, InterruptedException { int attempt = 1; - final long startTime = System.currentTimeMillis(); + + long startTime = System.currentTimeMillis(); while (true) { - if (isMetadataPropagated(topic, partition)) + if (isMetadataPropagated(topic, part)) return; - final long duration = System.currentTimeMillis() - startTime; + long duration = System.currentTimeMillis() - startTime; if (duration < timeout) Thread.sleep(interval); else - throw new TimeoutException("metadata propagate timed out, attempt=" + attempt); + throw new TimeoutException("Metadata propagation is timed out, attempt " + attempt); attempt++; } - } /** - * Sets up embedded Kafka Server + * Sets up embedded Kafka server. * - * @throws IOException + * @throws IOException If failed. */ - private void setupEmbeddedKafkaServer() - throws IOException { + private void setupEmbeddedKafkaServer() throws IOException { A.ensure(zkReady, "Zookeeper should be setup before hand"); - brokerConfig = new KafkaConfig(getBrokerConfig()); - kafkaServer = new KafkaServer(brokerConfig, SystemTime$.MODULE$); - kafkaServer.startup(); + brokerCfg = new KafkaConfig(getBrokerConfig()); + + kafkaSrv = new KafkaServer(brokerCfg, SystemTime$.MODULE$); + + kafkaSrv.startup(); } /** - * Sets up embedded zooKeeper + * Sets up embedded ZooKeeper. * - * @throws IOException - * @throws InterruptedException + * @throws IOException If failed. + * @throws InterruptedException If interrupted. */ - private void setupEmbeddedZooKeeper() - throws IOException, InterruptedException { + private void setupEmbeddedZooKeeper() throws IOException, InterruptedException { EmbeddedZooKeeper zooKeeper = new EmbeddedZooKeeper(ZK_HOST, zkPort); + zooKeeper.startup(); + zkPort = zooKeeper.getActualPort(); + zkClient = new ZkClient(getZKAddress(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, ZKStringSerializer$.MODULE$); + zkReady = true; } /** - * @return Kafka Broker Address. + * @return Kafka broker address. */ private static String getBrokerAddress() { - return ZK_HOST + ":" + BROKER_PORT; + return ZK_HOST + ':' + BROKER_PORT; } /** - * Gets KafKa Brofer Config + * Gets Kafka broker config. * - * @return Kafka Broker Config - * @throws IOException + * @return Kafka broker config. + * @throws IOException If failed. */ - private static Properties getBrokerConfig() - throws IOException { + private static Properties getBrokerConfig() throws IOException { Properties props = new Properties(); + props.put("broker.id", "0"); props.put("host.name", ZK_HOST); props.put("port", "" + BROKER_PORT); @@ -272,60 +272,63 @@ public class KafkaEmbeddedBroker { props.put("zookeeper.connect", getZKAddress()); props.put("log.flush.interval.messages", "1"); props.put("replica.socket.timeout.ms", "1500"); + return props; } /** - * @return Kafka Producer Config + * @return Kafka Producer config. */ private static ProducerConfig getProducerConfig() { Properties props = new Properties(); + props.put("metadata.broker.list", getBrokerAddress()); - props.put("serializer.class", "kafka.serializer.StringEncoder"); - props.put("key.serializer.class", "kafka.serializer.StringEncoder"); - props.put("partitioner.class", "org.apache.ignite.kafka.SimplePartitioner"); + props.put("serializer.class", StringEncoder.class.getName()); + props.put("key.serializer.class", StringEncoder.class.getName()); + props.put("partitioner.class", SimplePartitioner.class.getName()); + return new ProducerConfig(props); } /** - * Creates Temp Directory + * Creates temp directory. * - * @param prefix prefix - * @return Created File. - * @throws IOException + * @param prefix Prefix. + * @return Created file. + * @throws IOException If failed. */ - private static File createTempDir(final String prefix) - throws IOException { - final Path path = Files.createTempDirectory(prefix); - return path.toFile(); + private static File createTempDir( String prefix) throws IOException { + Path path = Files.createTempDirectory(prefix); + return path.toFile(); } /** - * Creates Embedded ZooKeeper. + * Creates embedded ZooKeeper. */ private static class EmbeddedZooKeeper { - /** Default ZooKeeper Host. */ + /** Default ZooKeeper host. */ private final String zkHost; - /** Default ZooKeeper Port. */ + /** Default ZooKeeper port. */ private final int zkPort; - /** NIO Context Factory. */ + /** NIO context factory. */ private NIOServerCnxnFactory factory; - /** Snapshot Directory. */ + /** Snapshot directory. */ private File snapshotDir; - /** Log Directory. */ + /** Log directory. */ private File logDir; /** - * Creates an embedded Zookeeper - * @param zkHost zookeeper host - * @param zkPort zookeeper port + * Creates an embedded ZooKeeper. + * + * @param zkHost ZooKeeper host. + * @param zkPort ZooKeeper port. */ - EmbeddedZooKeeper(final String zkHost, final int zkPort) { + EmbeddedZooKeeper(String zkHost, int zkPort) { this.zkHost = zkHost; this.zkPort = zkPort; } @@ -333,22 +336,25 @@ public class KafkaEmbeddedBroker { /** * Starts up ZooKeeper. * - * @throws IOException - * @throws InterruptedException + * @throws IOException If failed. + * @throws InterruptedException If interrupted. */ - void startup() - throws IOException, InterruptedException { + void startup() throws IOException, InterruptedException { snapshotDir = createTempDir("_ss"); + logDir = createTempDir("_log"); - ZooKeeperServer zooServer = new ZooKeeperServer(snapshotDir, logDir, 500); + + ZooKeeperServer zkSrv = new ZooKeeperServer(snapshotDir, logDir, 500); + factory = new NIOServerCnxnFactory(); + factory.configure(new InetSocketAddress(zkHost, zkPort), 16); - factory.startup(zooServer); + + factory.startup(zkSrv); } /** - * - * @return actual port zookeeper is started + * @return Actual port ZooKeeper is started. */ int getActualPort() { return factory.getLocalPort(); @@ -357,17 +363,16 @@ public class KafkaEmbeddedBroker { /** * Shuts down ZooKeeper. * - * @throws IOException + * @throws IOException If failed. */ - void shutdown() - throws IOException { + void shutdown() throws IOException { if (factory != null) { factory.shutdown(); U.delete(snapshotDir); + U.delete(logDir); } } } - } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/63fce5a4/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java index 5972639..2473990 100644 --- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java +++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java @@ -32,31 +32,31 @@ import java.util.*; import java.util.concurrent.*; import static org.apache.ignite.events.EventType.*; +import static org.apache.ignite.stream.kafka.KafkaEmbeddedBroker.*; /** * Tests {@link KafkaStreamer}. */ -public class KafkaIgniteStreamerSelfTest - extends GridCommonAbstractTest { +public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest { /** Embedded Kafka. */ private KafkaEmbeddedBroker embeddedBroker; /** Count. */ private static final int CNT = 100; - /** Test Topic. */ + /** Test topic. */ private static final String TOPIC_NAME = "page_visits"; - /** Kafka Partition. */ + /** Kafka partition. */ private static final int PARTITIONS = 4; - /** Kafka Replication Factor. */ + /** Kafka replication factor. */ private static final int REPLICATION_FACTOR = 1; - /** Topic Message Key Prefix. */ + /** Topic message key prefix. */ private static final String KEY_PREFIX = "192.168.2."; - /** Topic Message Value Url. */ + /** Topic message value URL. */ private static final String VALUE_URL = ",www.example.com,"; /** Constructor. */ @@ -65,18 +65,15 @@ public class KafkaIgniteStreamerSelfTest } /** {@inheritDoc} */ - @Override - protected void beforeTest() - throws Exception { + @SuppressWarnings("unchecked") + @Override protected void beforeTest() throws Exception { grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration()); embeddedBroker = new KafkaEmbeddedBroker(); } /** {@inheritDoc} */ - @Override - protected void afterTest() - throws Exception { + @Override protected void afterTest() throws Exception { grid().cache(null).clear(); embeddedBroker.shutdown(); @@ -85,76 +82,80 @@ public class KafkaIgniteStreamerSelfTest /** * Tests Kafka streamer. * - * @throws TimeoutException - * @throws InterruptedException + * @throws TimeoutException If timed out. + * @throws InterruptedException If interrupted. */ - public void testKafkaStreamer() - throws TimeoutException, InterruptedException { + public void testKafkaStreamer() throws TimeoutException, InterruptedException { embeddedBroker.createTopic(TOPIC_NAME, PARTITIONS, REPLICATION_FACTOR); - Map<String, String> keyValueMap = produceStream(TOPIC_NAME); - consumerStream(TOPIC_NAME, keyValueMap); + Map<String, String> keyValMap = produceStream(TOPIC_NAME); + + consumerStream(TOPIC_NAME, keyValMap); } /** - * Produces/Sends messages to Kafka. + * Sends messages to Kafka. * * @param topic Topic name. * @return Map of key value messages. */ - private Map<String, String> produceStream(final String topic) { - final Map<String, String> keyValueMap = new HashMap<>(); - + private Map<String, String> produceStream(String topic) { // Generate random subnets. List<Integer> subnet = new ArrayList<>(); - int i = 0; - while (i <= CNT) - subnet.add(++i); + for (int i = 1; i <= CNT; i++) + subnet.add(i); Collections.shuffle(subnet); - final List<KeyedMessage<String, String>> messages = new ArrayList<>(); - for (int event = 0; event < CNT; event++) { - long runtime = new Date().getTime(); - String ip = KEY_PREFIX + subnet.get(event); + List<KeyedMessage<String, String>> messages = new ArrayList<>(CNT); + + Map<String, String> keyValMap = new HashMap<>(); + + for (int evt = 0; evt < CNT; evt++) { + long runtime = System.currentTimeMillis(); + + String ip = KEY_PREFIX + subnet.get(evt); + String msg = runtime + VALUE_URL + ip; + messages.add(new KeyedMessage<>(topic, ip, msg)); - keyValueMap.put(ip, msg); + + keyValMap.put(ip, msg); } - final Producer<String, String> producer = embeddedBroker.sendMessages(messages); + Producer<String, String> producer = embeddedBroker.sendMessages(messages); + producer.close(); - return keyValueMap; + return keyValMap; } /** - * Consumes Kafka Stream via ignite. + * Consumes Kafka stream via Ignite. * * @param topic Topic name. - * @param keyValueMap Expected key value map. - * @throws TimeoutException TimeoutException. - * @throws InterruptedException InterruptedException. + * @param keyValMap Expected key value map. + * @throws TimeoutException If timed out. + * @throws InterruptedException If interrupted. */ - private void consumerStream(final String topic, final Map<String, String> keyValueMap) + private void consumerStream(String topic, Map<String, String> keyValMap) throws TimeoutException, InterruptedException { - KafkaStreamer<String, String, String> kafkaStmr = null; - final Ignite ignite = grid(); - try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(null)) { + Ignite ignite = grid(); + try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(null)) { stmr.allowOverwrite(true); stmr.autoFlushFrequency(10); - // Configure socket streamer. + // Configure Kafka streamer. kafkaStmr = new KafkaStreamer<>(); // Get the cache. IgniteCache<String, String> cache = ignite.cache(null); - // Set ignite instance. + // Set Ignite instance. kafkaStmr.setIgnite(ignite); // Set data streamer instance. @@ -167,58 +168,55 @@ public class KafkaIgniteStreamerSelfTest kafkaStmr.setThreads(4); // Set the consumer configuration. - kafkaStmr.setConsumerConfig(createDefaultConsumerConfig(KafkaEmbeddedBroker.getZKAddress(), - "groupX")); + kafkaStmr.setConsumerConfig(createDefaultConsumerConfig(getZKAddress(), "groupX")); // Set the decoders. - StringDecoder stringDecoder = new StringDecoder(new VerifiableProperties()); - kafkaStmr.setKeyDecoder(stringDecoder); - kafkaStmr.setValueDecoder(stringDecoder); + StringDecoder strDecoder = new StringDecoder(new VerifiableProperties()); + + kafkaStmr.setKeyDecoder(strDecoder); + kafkaStmr.setValueDecoder(strDecoder); // Start kafka streamer. kafkaStmr.start(); final CountDownLatch latch = new CountDownLatch(CNT); + IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() { - @Override - public boolean apply(UUID uuid, CacheEvent evt) { + @Override public boolean apply(UUID uuid, CacheEvent evt) { latch.countDown(); + return true; } }; ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT); - latch.await(); - for (Map.Entry<String, String> entry : keyValueMap.entrySet()) { - final String key = entry.getKey(); - final String value = entry.getValue(); + latch.await(); - final String cacheValue = cache.get(key); - assertEquals(value, cacheValue); - } + for (Map.Entry<String, String> entry : keyValMap.entrySet()) + assertEquals(entry.getValue(), cache.get(entry.getKey())); } - finally { - // Shutdown kafka streamer. - kafkaStmr.stop(); + if (kafkaStmr != null) + kafkaStmr.stop(); } } /** * Creates default consumer config. * - * @param zooKeeper Zookeeper address <server:port>. - * @param groupId Group Id for kafka subscriber. - * @return {@link ConsumerConfig} kafka consumer configuration. + * @param zooKeeper ZooKeeper address <server:port>. + * @param grpId Group Id for kafka subscriber. + * @return Kafka consumer configuration. */ - private ConsumerConfig createDefaultConsumerConfig(String zooKeeper, String groupId) { + private ConsumerConfig createDefaultConsumerConfig(String zooKeeper, String grpId) { A.notNull(zooKeeper, "zookeeper"); - A.notNull(groupId, "groupId"); + A.notNull(grpId, "groupId"); Properties props = new Properties(); + props.put("zookeeper.connect", zooKeeper); - props.put("group.id", groupId); + props.put("group.id", grpId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); @@ -226,5 +224,4 @@ public class KafkaIgniteStreamerSelfTest return new ConsumerConfig(props); } - } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/63fce5a4/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java index b836b44..1ef4f77 100644 --- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java +++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java @@ -18,29 +18,36 @@ package org.apache.ignite.stream.kafka; import kafka.producer.*; +import kafka.utils.*; /** - * Simple Partitioner for Kafka. + * Simple partitioner for Kafka. */ @SuppressWarnings("UnusedDeclaration") -public class SimplePartitioner - implements Partitioner { +public class SimplePartitioner implements Partitioner { + /** + * Constructs instance. + * + * @param props Properties. + */ + public SimplePartitioner(VerifiableProperties props) { + // No-op. + } /** * Partitions the key based on the key value. * * @param key Key. - * @param partitionSize Partition size. + * @param partSize Partition size. * @return partition Partition. */ - public int partition(Object key, int partitionSize) { - int partition = 0; + public int partition(Object key, int partSize) { String keyStr = (String)key; + String[] keyValues = keyStr.split("\\."); + Integer intKey = Integer.parseInt(keyValues[3]); - if (intKey > 0) { - partition = intKey % partitionSize; - } - return partition; + + return intKey > 0 ? intKey % partSize : 0; } }