https://github.com/bobby/kafka-streams-clojure
Clojure transducers <https://clojure.org/reference/transducers> interface to Kafka Streams <https://kafka.apache.org/documentation/streams>. This combo provides the best of both worlds for building streaming applications on Kafka with Clojure: * Simple, declarative, idiomatic, composable, testable stream transformation logic, via transducers * Easy, battle-hardened distributed system topology specification, cluster partition rebalancing, local state management, etc. via Kafka Streams ## Status **THIS LIBRARY IS CURRENTLY ALPHA STATUS, AND IS NOT FIT FOR PRODUCTION USE!** I would love to hear from people who think this might be useful to them, or what's wrong with it. ### Features & Roadmap Currently, this library supports: * Hooking a transducer into a `KStream` processing pipeline. In the future, I plan for this library to support: * Helper transducers for stateful computations like joins, windowed aggregates, etc. to mirror the functionality of the `KStream` API, but which can be composed with purely functional steps * An appropriate level of integration into both the low-level `Processor` API and the `KTable` APIs. ## Installation **Note: Due to its alpha status, this library is not configured for CI/CD, and no JARs have been pushed to a public repository. You'll have to install (as per instructions below) into your local Maven repo before the following instructions will work** Include the library JAR in your Boot/Leiningen dependencies: ``` clojure [kafka-streams-clojure "0.1.0-SNAPSHOT"] ``` ### Kafka Streams Dependency Kafka Streams is included as a `provided` dependency, meaning your application will need to include the Kafka Streams JAR <https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams> as a dependency as well as this library. ## Usage Transducers provide a more Clojure-idiomatic way to transform streaming key value pairs than `KStream`'s Java 8 Streams-like API. The key function is `kafka-streams-clojure.api/transduce-kstream`, which makes the given `KStream` a transducible context by applying the given transducer as a `Transformer`. The step function is invoked with the `ProcessorContext` and a 2-tuple of `[key value]` for each record, so the transducer should be shaped accordingly. This library also provides a number of stateful transducers over Kafka Streams' Stores API for doing joins, windowed aggregates, etc. The goal of this library is to maintain feature parity with the high-level `KStream`, `KTable`, etc. APIs, as well as (eventually) to enable transducer usage in the low-level `Processor` API. ``` clojure // Start Kafka Cluster running locally (require '[kafka-streams-clojure.api :as api]) (import '[org.apache.kafka.clients.producer KafkaProducer ProducerRecord] '[org.apache.kafka.streams StreamsConfig KafkaStreams] '[org.apache.kafka.streams.kstream KStreamBuilder]) (def xform (comp (filter (fn [[k v]] (string? v))) (map (fn [[k v]] [v k])) (filter (fn [[k v]] (= "foo" v))))) (def builder (KStreamBuilder.)) (def kstream (-> builder (.stream (into-array String ["tset"])) (api/transduce-kstream xform) (.to "test"))) (def kafka-streams (KafkaStreams. builder (StreamsConfig. {StreamsConfig/APPLICATION_ID_CONFIG "test-app-id" StreamsConfig/BOOTSTRAP_SERVERS_CONFIG "localhost:9092" StreamsConfig/KEY_SERDE_CLASS_CONFIG org.apache.kafka.common.serialization.Serdes$StringSerde StreamsConfig/VALUE_SERDE_CLASS_CONFIG org.apache.kafka.common.serialization.Serdes$StringSerde}))) (.start kafka-streams) (def producer (KafkaProducer. {"bootstrap.servers" "localhost:9092" "acks" "all" "retries" "0" "key.serializer" "org.apache.kafka.common.serialization.StringSerializer" "value.serializer" "org.apache.kafka.common.serialization.StringSerializer"})) @(.send producer (ProducerRecord. "tset" "foo" "bar")) // Observe message come across topic "test" via kafka-console-consumer @(.send producer (ProducerRecord. "tset" "baz" "quux")) // Observe message does not come across topic "test" via kafka-console-consumer (.close producer) (.close kafka-streams) ``` -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to [email protected] Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to [email protected] For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. For more options, visit https://groups.google.com/d/optout.
