ignite-428 Implement IgniteKafkaStreamer to stream data from Apache Kafka
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2c41739d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2c41739d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2c41739d Branch: refs/heads/ignite-1056 Commit: 2c41739dcd83751270b6bd30c6f2595edc68a1b1 Parents: 9f6a7f9 Author: vishal.garg <vishal.g...@workday.com> Authored: Mon Jun 22 19:35:08 2015 -0700 Committer: agura <ag...@gridgain.com> Committed: Fri Jul 3 19:39:11 2015 +0300 ---------------------------------------------------------------------- modules/kafka/pom.xml | 128 +++++++ .../ignite/stream/kafka/KafkaStreamer.java | 179 +++++++++ .../stream/kafka/KafkaEmbeddedBroker.java | 373 +++++++++++++++++++ .../kafka/KafkaIgniteStreamerSelfTest.java | 230 ++++++++++++ .../ignite/stream/kafka/SimplePartitioner.java | 46 +++ pom.xml | 1 + 6 files changed, 957 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2c41739d/modules/kafka/pom.xml ---------------------------------------------------------------------- diff --git a/modules/kafka/pom.xml b/modules/kafka/pom.xml new file mode 100644 index 0000000..165ec1c --- /dev/null +++ b/modules/kafka/pom.xml @@ -0,0 +1,128 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + POM file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-parent</artifactId> + <version>1</version> + <relativePath>../../parent</relativePath> + </parent> + + <artifactId>ignite-kafka</artifactId> + <version>1.1.1-SNAPSHOT</version> + + <dependencies> + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <version>0.8.2.1</version> + <exclusions> + <exclusion> + <groupId>com.sun.jmx</groupId> + <artifactId>jmxri</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jdmk</groupId> + <artifactId>jmxtools</artifactId> + </exclusion> + <exclusion> + <groupId>net.sf.jopt-simple</groupId> + <artifactId>jopt-simple</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>3.4.5</version> + </dependency> + + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>2.4</version> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-log4j</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.ow2.asm</groupId> + <artifactId>asm-all</artifactId> + <version>4.2</version> + </dependency> + + + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </dependency> + + <dependency> + <groupId>org.gridgain</groupId> + <artifactId>ignite-shmem</artifactId> + <version>1.0.0</version> + </dependency> + + <dependency> + <groupId>commons-beanutils</groupId> + <artifactId>commons-beanutils</artifactId> + <version>1.8.3</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-spring</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2c41739d/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java new file mode 100644 index 0000000..e0240ce --- /dev/null +++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.kafka; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.stream.*; + +import kafka.consumer.*; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.message.*; +import kafka.serializer.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * Server that subscribes to topic messages from Kafka broker, streams its to key-value pairs into {@link + * org.apache.ignite.IgniteDataStreamer} instance. + * <p> + * Uses Kafka's High Level Consumer API to read messages from Kafka + * + * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example">Consumer Consumer Group + * Example</a> + */ +public class KafkaStreamer<T, K, V> + extends StreamAdapter<T, K, V> { + + /** Logger. */ + private IgniteLogger log; + + /** Executor used to submit kafka streams. */ + private ExecutorService executor; + + /** Topic. */ + private String topic; + + /** Number of threads to process kafka streams. */ + private int threads; + + /** Kafka Consumer Config. */ + private ConsumerConfig consumerConfig; + + /** Key Decoder. */ + private Decoder<K> keyDecoder; + + /** Value Decoder. */ + private Decoder<V> valueDecoder; + + /** Kafka Consumer connector. */ + private ConsumerConnector consumer; + + /** + * Sets the topic. + * + * @param topic Topic Name. + */ + public void setTopic(final String topic) { + this.topic = topic; + } + + /** + * Sets the threads. + * + * @param threads Number of Threads. + */ + public void setThreads(final int threads) { + this.threads = threads; + } + + /** + * Sets the consumer config. + * + * @param consumerConfig Consumer configuration. + */ + public void setConsumerConfig(final ConsumerConfig consumerConfig) { + this.consumerConfig = consumerConfig; + } + + /** + * Sets the key decoder. + * + * @param keyDecoder Key Decoder. + */ + public void setKeyDecoder(final Decoder<K> keyDecoder) { + this.keyDecoder = keyDecoder; + } + + /** + * Sets the value decoder. + * + * @param valueDecoder Value Decoder + */ + public void setValueDecoder(final Decoder<V> valueDecoder) { + this.valueDecoder = valueDecoder; + } + + /** + * Starts streamer. + * + * @throws IgniteException If failed. + */ + public void start() { + A.notNull(getStreamer(), "streamer"); + A.notNull(getIgnite(), "ignite"); + A.notNull(topic, "topic"); + A.notNull(keyDecoder, "key decoder"); + A.notNull(valueDecoder, "value decoder"); + A.notNull(consumerConfig, "kafka consumer config"); + A.ensure(threads > 0, "threads > 0"); + + log = getIgnite().log(); + + consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); + + Map<String, Integer> topicCountMap = new HashMap<>(); + topicCountMap.put(topic, threads); + + Map<String, List<KafkaStream<K, V>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, + valueDecoder); + + List<KafkaStream<K, V>> streams = consumerMap.get(topic); + + // Now launch all the consumer threads. + executor = Executors.newFixedThreadPool(threads); + + // Now create an object to consume the messages. + for (final KafkaStream<K,V> stream : streams) { + executor.submit(new Runnable() { + @Override public void run() { + + ConsumerIterator<K, V> it = stream.iterator(); + + while (it.hasNext()) { + final MessageAndMetadata<K, V> messageAndMetadata = it.next(); + getStreamer().addData(messageAndMetadata.key(), messageAndMetadata.message()); + } + } + }); + } + } + + /** + * Stops streamer. + */ + public void stop() { + if (consumer != null) + consumer.shutdown(); + + if (executor != null) { + executor.shutdown(); + + try { + if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) + if (log.isDebugEnabled()) + log.debug("Timed out waiting for consumer threads to shut down, exiting uncleanly"); + } + catch (InterruptedException e) { + if (log.isDebugEnabled()) + log.debug("Interrupted during shutdown, exiting uncleanly"); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2c41739d/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java new file mode 100644 index 0000000..28533f7 --- /dev/null +++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.kafka; + +import org.apache.commons.io.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.zookeeper.server.*; + +import kafka.admin.*; +import kafka.api.*; +import kafka.api.Request; +import kafka.producer.*; +import kafka.server.*; +import kafka.utils.*; +import org.I0Itec.zkclient.*; + +import java.io.*; +import java.net.*; +import java.nio.file.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Kafka Embedded Broker. + */ +public class KafkaEmbeddedBroker { + + /** Default ZooKeeper Host. */ + private static final String ZK_HOST = "localhost"; + + /** Broker Port. */ + private static final int BROKER_PORT = 9092; + + /** ZooKeeper Connection Timeout. */ + private static final int ZK_CONNECTION_TIMEOUT = 6000; + + /** ZooKeeper Session Timeout. */ + private static final int ZK_SESSION_TIMEOUT = 6000; + + /** ZooKeeper port. */ + private static int zkPort = 0; + + /** Is ZooKeeper Ready. */ + private boolean zkReady; + + /** Kafka Config. */ + private KafkaConfig brokerConfig; + + /** Kafka Server. */ + private KafkaServer kafkaServer; + + /** ZooKeeper Client. */ + private ZkClient zkClient; + + /** Embedded ZooKeeper. */ + private EmbeddedZooKeeper zooKeeper; + + /** + * Creates an embedded Kafka Broker. + */ + public KafkaEmbeddedBroker() { + try { + setupEmbeddedZooKeeper(); + setupEmbeddedKafkaServer(); + } + catch (IOException | InterruptedException e) { + throw new RuntimeException("failed to start Kafka Broker " + e); + } + + } + + /** + * @return ZooKeeper Address. + */ + public static String getZKAddress() { + return ZK_HOST + ":" + zkPort; + } + + /** + * Creates a Topic. + * + * @param topic topic name + * @param partitions number of paritions for the topic + * @param replicationFactor replication factor + * @throws TimeoutException + * @throws InterruptedException + */ + public void createTopic(String topic, final int partitions, final int replicationFactor) + throws TimeoutException, InterruptedException { + AdminUtils.createTopic(zkClient, topic, partitions, replicationFactor, new Properties()); + waitUntilMetadataIsPropagated(topic, 0, 10000, 100); + } + + /** + * Sends message to Kafka Broker. + * + * @param keyedMessages List of Keyed Messages. + * @return Producer used to send the message. + */ + public Producer sendMessages(List<KeyedMessage<String, String>> keyedMessages) { + Producer<String, String> producer = new Producer<>(getProducerConfig()); + producer.send(scala.collection.JavaConversions.asScalaBuffer(keyedMessages)); + return producer; + } + + /** + * Shuts down Kafka Broker. + * + * @throws IOException + */ + public void shutdown() + throws IOException { + + zkReady = false; + + if (kafkaServer != null) + kafkaServer.shutdown(); + + List<String> logDirs = scala.collection.JavaConversions.asJavaList(brokerConfig.logDirs()); + + for (String logDir : logDirs) { + FileUtils.deleteDirectory(new File(logDir)); + } + + if (zkClient != null) { + zkClient.close(); + zkClient = null; + } + + if (zooKeeper != null) { + + try { + zooKeeper.shutdown(); + } + catch (IOException e) { + // ignore + } + + zooKeeper = null; + } + + } + + /** + * @return the Zookeeper Client + */ + private ZkClient getZkClient() { + A.ensure(zkReady, "Zookeeper not setup yet"); + A.notNull(zkClient, "Zookeeper client is not yet initialized"); + + return zkClient; + } + + /** + * Checks if topic metadata is propagated. + * + * @param topic topic name + * @param partition partition + * @return true if propagated otherwise false + */ + private boolean isMetadataPropagated(final String topic, final int partition) { + final scala.Option<PartitionStateInfo> partitionStateOption = kafkaServer.apis().metadataCache().getPartitionInfo( + topic, partition); + if (partitionStateOption.isDefined()) { + final PartitionStateInfo partitionState = partitionStateOption.get(); + final LeaderAndIsr leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch().leaderAndIsr(); + + if (ZkUtils.getLeaderForPartition(getZkClient(), topic, partition) != null + && Request.isValidBrokerId(leaderAndInSyncReplicas.leader()) + && leaderAndInSyncReplicas.isr().size() >= 1) + return true; + + } + return false; + } + + /** + * Waits until metadata is propagated. + * + * @param topic topic name + * @param partition partition + * @param timeout timeout value in millis + * @param interval interval in millis to sleep + * @throws TimeoutException + * @throws InterruptedException + */ + private void waitUntilMetadataIsPropagated(final String topic, final int partition, final long timeout, + final long interval) throws TimeoutException, InterruptedException { + int attempt = 1; + final long startTime = System.currentTimeMillis(); + + while (true) { + if (isMetadataPropagated(topic, partition)) + return; + + final long duration = System.currentTimeMillis() - startTime; + + if (duration < timeout) + Thread.sleep(interval); + else + throw new TimeoutException("metadata propagate timed out, attempt=" + attempt); + + attempt++; + } + + } + + /** + * Sets up embedded Kafka Server + * + * @throws IOException + */ + private void setupEmbeddedKafkaServer() + throws IOException { + A.ensure(zkReady, "Zookeeper should be setup before hand"); + + brokerConfig = new KafkaConfig(getBrokerConfig()); + kafkaServer = new KafkaServer(brokerConfig, SystemTime$.MODULE$); + kafkaServer.startup(); + } + + /** + * Sets up embedded zooKeeper + * + * @throws IOException + * @throws InterruptedException + */ + private void setupEmbeddedZooKeeper() + throws IOException, InterruptedException { + EmbeddedZooKeeper zooKeeper = new EmbeddedZooKeeper(ZK_HOST, zkPort); + zooKeeper.startup(); + zkPort = zooKeeper.getActualPort(); + zkClient = new ZkClient(getZKAddress(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, ZKStringSerializer$.MODULE$); + zkReady = true; + } + + /** + * @return Kafka Broker Address. + */ + private static String getBrokerAddress() { + return ZK_HOST + ":" + BROKER_PORT; + } + + /** + * Gets KafKa Brofer Config + * + * @return Kafka Broker Config + * @throws IOException + */ + private static Properties getBrokerConfig() + throws IOException { + Properties props = new Properties(); + props.put("broker.id", "0"); + props.put("host.name", ZK_HOST); + props.put("port", "" + BROKER_PORT); + props.put("log.dir", createTempDir("_cfg").getAbsolutePath()); + props.put("zookeeper.connect", getZKAddress()); + props.put("log.flush.interval.messages", "1"); + props.put("replica.socket.timeout.ms", "1500"); + return props; + } + + /** + * @return Kafka Producer Config + */ + private static ProducerConfig getProducerConfig() { + Properties props = new Properties(); + props.put("metadata.broker.list", getBrokerAddress()); + props.put("serializer.class", "kafka.serializer.StringEncoder"); + props.put("key.serializer.class", "kafka.serializer.StringEncoder"); + props.put("partitioner.class", "org.apache.ignite.kafka.SimplePartitioner"); + return new ProducerConfig(props); + } + + /** + * Creates Temp Directory + * + * @param prefix prefix + * @return Created File. + * @throws IOException + */ + private static File createTempDir(final String prefix) + throws IOException { + final Path path = Files.createTempDirectory(prefix); + return path.toFile(); + + } + + /** + * Creates Embedded ZooKeeper. + */ + private static class EmbeddedZooKeeper { + /** Default ZooKeeper Host. */ + private final String zkHost; + + /** Default ZooKeeper Port. */ + private final int zkPort; + + /** NIO Context Factory. */ + private NIOServerCnxnFactory factory; + + /** Snapshot Directory. */ + private File snapshotDir; + + /** Log Directory. */ + private File logDir; + + /** + * Creates an embedded Zookeeper + * @param zkHost zookeeper host + * @param zkPort zookeeper port + */ + EmbeddedZooKeeper(final String zkHost, final int zkPort) { + this.zkHost = zkHost; + this.zkPort = zkPort; + } + + /** + * Starts up ZooKeeper. + * + * @throws IOException + * @throws InterruptedException + */ + void startup() + throws IOException, InterruptedException { + snapshotDir = createTempDir("_ss"); + logDir = createTempDir("_log"); + ZooKeeperServer zooServer = new ZooKeeperServer(snapshotDir, logDir, 500); + factory = new NIOServerCnxnFactory(); + factory.configure(new InetSocketAddress(zkHost, zkPort), 16); + factory.startup(zooServer); + } + + /** + * + * @return actual port zookeeper is started + */ + int getActualPort() { + return factory.getLocalPort(); + } + + /** + * Shuts down ZooKeeper. + * + * @throws IOException + */ + void shutdown() + throws IOException { + if (factory != null) { + factory.shutdown(); + + U.delete(snapshotDir); + U.delete(logDir); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2c41739d/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java new file mode 100644 index 0000000..5972639 --- /dev/null +++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.kafka; + +import org.apache.ignite.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.testframework.junits.common.*; + +import kafka.consumer.*; +import kafka.producer.*; +import kafka.serializer.*; +import kafka.utils.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.events.EventType.*; + +/** + * Tests {@link KafkaStreamer}. + */ +public class KafkaIgniteStreamerSelfTest + extends GridCommonAbstractTest { + /** Embedded Kafka. */ + private KafkaEmbeddedBroker embeddedBroker; + + /** Count. */ + private static final int CNT = 100; + + /** Test Topic. */ + private static final String TOPIC_NAME = "page_visits"; + + /** Kafka Partition. */ + private static final int PARTITIONS = 4; + + /** Kafka Replication Factor. */ + private static final int REPLICATION_FACTOR = 1; + + /** Topic Message Key Prefix. */ + private static final String KEY_PREFIX = "192.168.2."; + + /** Topic Message Value Url. */ + private static final String VALUE_URL = ",www.example.com,"; + + /** Constructor. */ + public KafkaIgniteStreamerSelfTest() { + super(true); + } + + /** {@inheritDoc} */ + @Override + protected void beforeTest() + throws Exception { + grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration()); + + embeddedBroker = new KafkaEmbeddedBroker(); + } + + /** {@inheritDoc} */ + @Override + protected void afterTest() + throws Exception { + grid().cache(null).clear(); + + embeddedBroker.shutdown(); + } + + /** + * Tests Kafka streamer. + * + * @throws TimeoutException + * @throws InterruptedException + */ + public void testKafkaStreamer() + throws TimeoutException, InterruptedException { + embeddedBroker.createTopic(TOPIC_NAME, PARTITIONS, REPLICATION_FACTOR); + + Map<String, String> keyValueMap = produceStream(TOPIC_NAME); + consumerStream(TOPIC_NAME, keyValueMap); + } + + /** + * Produces/Sends messages to Kafka. + * + * @param topic Topic name. + * @return Map of key value messages. + */ + private Map<String, String> produceStream(final String topic) { + final Map<String, String> keyValueMap = new HashMap<>(); + + // Generate random subnets. + List<Integer> subnet = new ArrayList<>(); + + int i = 0; + while (i <= CNT) + subnet.add(++i); + + Collections.shuffle(subnet); + + final List<KeyedMessage<String, String>> messages = new ArrayList<>(); + for (int event = 0; event < CNT; event++) { + long runtime = new Date().getTime(); + String ip = KEY_PREFIX + subnet.get(event); + String msg = runtime + VALUE_URL + ip; + messages.add(new KeyedMessage<>(topic, ip, msg)); + keyValueMap.put(ip, msg); + } + + final Producer<String, String> producer = embeddedBroker.sendMessages(messages); + producer.close(); + + return keyValueMap; + } + + /** + * Consumes Kafka Stream via ignite. + * + * @param topic Topic name. + * @param keyValueMap Expected key value map. + * @throws TimeoutException TimeoutException. + * @throws InterruptedException InterruptedException. + */ + private void consumerStream(final String topic, final Map<String, String> keyValueMap) + throws TimeoutException, InterruptedException { + + KafkaStreamer<String, String, String> kafkaStmr = null; + + final Ignite ignite = grid(); + try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(null)) { + + stmr.allowOverwrite(true); + stmr.autoFlushFrequency(10); + + // Configure socket streamer. + kafkaStmr = new KafkaStreamer<>(); + + // Get the cache. + IgniteCache<String, String> cache = ignite.cache(null); + + // Set ignite instance. + kafkaStmr.setIgnite(ignite); + + // Set data streamer instance. + kafkaStmr.setStreamer(stmr); + + // Set the topic. + kafkaStmr.setTopic(topic); + + // Set the number of threads. + kafkaStmr.setThreads(4); + + // Set the consumer configuration. + kafkaStmr.setConsumerConfig(createDefaultConsumerConfig(KafkaEmbeddedBroker.getZKAddress(), + "groupX")); + + // Set the decoders. + StringDecoder stringDecoder = new StringDecoder(new VerifiableProperties()); + kafkaStmr.setKeyDecoder(stringDecoder); + kafkaStmr.setValueDecoder(stringDecoder); + + // Start kafka streamer. + kafkaStmr.start(); + + final CountDownLatch latch = new CountDownLatch(CNT); + IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() { + @Override + public boolean apply(UUID uuid, CacheEvent evt) { + latch.countDown(); + return true; + } + }; + + ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT); + latch.await(); + + for (Map.Entry<String, String> entry : keyValueMap.entrySet()) { + final String key = entry.getKey(); + final String value = entry.getValue(); + + final String cacheValue = cache.get(key); + assertEquals(value, cacheValue); + } + } + + finally { + // Shutdown kafka streamer. + kafkaStmr.stop(); + } + } + + /** + * Creates default consumer config. + * + * @param zooKeeper Zookeeper address <server:port>. + * @param groupId Group Id for kafka subscriber. + * @return {@link ConsumerConfig} kafka consumer configuration. + */ + private ConsumerConfig createDefaultConsumerConfig(String zooKeeper, String groupId) { + A.notNull(zooKeeper, "zookeeper"); + A.notNull(groupId, "groupId"); + + Properties props = new Properties(); + props.put("zookeeper.connect", zooKeeper); + props.put("group.id", groupId); + props.put("zookeeper.session.timeout.ms", "400"); + props.put("zookeeper.sync.time.ms", "200"); + props.put("auto.commit.interval.ms", "1000"); + props.put("auto.offset.reset", "smallest"); + + return new ConsumerConfig(props); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2c41739d/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java new file mode 100644 index 0000000..b836b44 --- /dev/null +++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.kafka; + +import kafka.producer.*; + +/** + * Simple Partitioner for Kafka. + */ +@SuppressWarnings("UnusedDeclaration") +public class SimplePartitioner + implements Partitioner { + + /** + * Partitions the key based on the key value. + * + * @param key Key. + * @param partitionSize Partition size. + * @return partition Partition. + */ + public int partition(Object key, int partitionSize) { + int partition = 0; + String keyStr = (String)key; + String[] keyValues = keyStr.split("\\."); + Integer intKey = Integer.parseInt(keyValues[3]); + if (intKey > 0) { + partition = intKey % partitionSize; + } + return partition; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2c41739d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index a6d1609..b47d34b 100644 --- a/pom.xml +++ b/pom.xml @@ -70,6 +70,7 @@ <module>modules/gce</module> <module>modules/cloud</module> <module>modules/mesos</module> + <module>modules/kafka</module> </modules> <profiles>