https://github.com/bobby/kafka-streams-clojure

Clojure transducers <https://clojure.org/reference/transducers> interface 
to Kafka Streams <https://kafka.apache.org/documentation/streams>.  This
combo provides the best of both worlds for building streaming
applications on Kafka with Clojure:

* Simple, declarative, idiomatic, composable, testable stream
  transformation logic, via transducers
* Easy, battle-hardened distributed system topology specification,
  cluster partition rebalancing, local state management, etc. via Kafka
  Streams

## Status

**THIS LIBRARY IS CURRENTLY ALPHA STATUS, AND IS NOT FIT FOR PRODUCTION 
USE!**

I would love to hear from people who think this might be useful to them, or 
what's wrong with it.

### Features & Roadmap

Currently, this library supports:

* Hooking a transducer into a `KStream` processing pipeline.

In the future, I plan for this library to support:

* Helper transducers for stateful computations like joins, windowed
  aggregates, etc. to mirror the functionality of the `KStream` API,
  but which can be composed with purely functional steps
* An appropriate level of integration into both the low-level
  `Processor` API and the `KTable` APIs.

## Installation

**Note: Due to its alpha status, this library is not configured for
CI/CD, and no JARs have been pushed to a public repository.  You'll
have to install (as per instructions below) into your local Maven repo
before the following instructions will work**

Include the library JAR in your Boot/Leiningen dependencies:

``` clojure
[kafka-streams-clojure "0.1.0-SNAPSHOT"]
```

### Kafka Streams Dependency

Kafka Streams is included as a `provided` dependency, meaning your
application will need to include the
Kafka Streams JAR 
<https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams> as a 
dependency as well as this library.

## Usage

Transducers provide a more Clojure-idiomatic way to transform
streaming key value pairs than `KStream`'s Java 8 Streams-like API.
The key function is `kafka-streams-clojure.api/transduce-kstream`,
which makes the given `KStream` a transducible context by applying the
given transducer as a `Transformer`.  The step function is invoked
with the `ProcessorContext` and a 2-tuple of `[key value]` for each
record, so the transducer should be shaped accordingly.

This library also provides a number of stateful transducers over Kafka
Streams' Stores API for doing joins, windowed aggregates, etc.  The
goal of this library is to maintain feature parity with the high-level
`KStream`, `KTable`, etc. APIs, as well as (eventually) to enable
transducer usage in the low-level `Processor` API.

``` clojure
// Start Kafka Cluster running locally

(require '[kafka-streams-clojure.api :as api])
(import '[org.apache.kafka.clients.producer KafkaProducer ProducerRecord]
        '[org.apache.kafka.streams StreamsConfig KafkaStreams]
        '[org.apache.kafka.streams.kstream KStreamBuilder])

(def xform (comp (filter (fn [[k v]] (string? v)))
                 (map (fn [[k v]] [v k]))
                 (filter (fn [[k v]] (= "foo" v)))))
(def builder (KStreamBuilder.))
(def kstream (-> builder
                 (.stream (into-array String ["tset"]))
                 (api/transduce-kstream xform)
                 (.to "test")))

(def kafka-streams
  (KafkaStreams. builder
                 (StreamsConfig. {StreamsConfig/APPLICATION_ID_CONFIG   
 "test-app-id"
                                  StreamsConfig/BOOTSTRAP_SERVERS_CONFIG 
"localhost:9092"
                                  StreamsConfig/KEY_SERDE_CLASS_CONFIG   
org.apache.kafka.common.serialization.Serdes$StringSerde
                                  StreamsConfig/VALUE_SERDE_CLASS_CONFIG 
org.apache.kafka.common.serialization.Serdes$StringSerde})))
(.start kafka-streams)

(def producer (KafkaProducer. {"bootstrap.servers" "localhost:9092"
                               "acks"              "all"
                               "retries"           "0"
                               "key.serializer"   
 "org.apache.kafka.common.serialization.StringSerializer"
                               "value.serializer" 
 "org.apache.kafka.common.serialization.StringSerializer"}))

@(.send producer (ProducerRecord. "tset" "foo" "bar"))
// Observe message come across topic "test" via kafka-console-consumer

@(.send producer (ProducerRecord. "tset" "baz" "quux"))
// Observe message does not come across topic "test" via 
kafka-console-consumer

(.close producer)
(.close kafka-streams)
```

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to [email protected]
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
For more options, visit https://groups.google.com/d/optout.

Reply via email to