[
https://issues.apache.org/jira/browse/KAFKA-6840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Boyang Chen updated KAFKA-6840:
-------------------------------
Priority: Blocker (was: Major)
> support windowing in ktable API
> -------------------------------
>
> Key: KAFKA-6840
> URL: https://issues.apache.org/jira/browse/KAFKA-6840
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Affects Versions: 1.1.0
> Reporter: Boyang Chen
> Assignee: Boyang Chen
> Priority: Blocker
> Labels: api, needs-kip
> Fix For: 3.0.0
>
>
> The StreamsBuilder provides table() API to materialize a changelog topic into
> a local key-value store (KTable), which is very convenient. However, current
> underlying implementation does not support materializing one topic to a
> windowed key-value store, which in certain cases would be very useful.
> To make up the gap, we proposed a new API in StreamsBuilder that could get a
> windowed Ktable.
> The table() API in StreamsBuilder looks like this:
> public synchronized <K, V> KTable<K, V> table(final String topic,
> final Consumed<K, V>
> consumed,
> final Materialized<K, V,
> KeyValueStore<Bytes, byte[]>> materialized) {
> Objects.requireNonNull(topic, "topic can't be null");
> Objects.requireNonNull(consumed, "consumed can't be null");
> Objects.requireNonNull(materialized, "materialized can't be null");
>
> materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
> return internalStreamsBuilder.table(topic,
> new ConsumedInternal<>(consumed),
> new
> MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
> }
>
> Where we could see that the store type is given as KeyValueStore. There is no
> flexibility to change it to WindowStore.
>
> To maintain compatibility of the existing API, we have two options to define
> a new API:
> 1.Overload existing KTable struct
> public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String
> topic,
> final Consumed<K, V>
> consumed,
> final Materialized<K, V,
> WindowStore<Bytes, byte[]>> materialized);
>
> This could give developer an alternative to use windowed table instead.
> However, this implies that we need to make sure all the KTable logic still
> works as expected, such as join, aggregation, etc, so the challenge would be
> making sure all current KTable logics work.
>
> 2.Define a new type called WindowedKTable
> public synchronized <K, V> WindowedKTable<K, V> windowedTable(final String
> topic,
> final Consumed<K, V>
> consumed,
> final Materialized<K, V,
> WindowStore<Bytes, byte[]>> materialized);
> The benefit of doing this is that we don’t need to worry about the existing
> functionality of KTable. However, the cost is to introduce redundancy of
> common operation logic. When upgrading common functionality, we need to take
> care of both types.
> We could fill in more details in the KIP. Right now I would like to hear some
> feedbacks on the two approaches, thank you!
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)