AyoubOm commented on code in PR #15619:
URL: https://github.com/apache/kafka/pull/15619#discussion_r1566096443
##########
streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java:
##########
@@ -613,6 +616,74 @@ public synchronized <KIn, VIn> StreamsBuilder
addGlobalStore(final StoreBuilder<
return this;
}
+ /**
+ * Adds a global {@link StateStore} to the topology.
+ * The {@link StateStore} sources its data from all partitions of the
provided input topic.
+ * There will be exactly one instance of this {@link StateStore} per Kafka
Streams instance.
+ * <p>
+ * A {@link SourceNode} with the provided sourceName will be added to
consume the data arriving from the partitions
+ * of the input topic.
+ * <p>
+ * The provided {@link ProcessorSupplier} will be used to create an
+ * {@link Processor} that will receive all records forwarded from the
{@link SourceNode}.
+ * The supplier should always generate a new instance. Creating a single
{@link Processor} object
+ * and returning the same object reference in {@link
ProcessorSupplier#get()} is a
+ * violation of the supplier pattern and leads to runtime exceptions.
+ * This {@link Processor} should be used to keep the {@link StateStore}
up-to-date.
+ * The default {@link TimestampExtractor} as specified in the {@link
StreamsConfig config} is used.
+ * <p>
+ * It is not required to connect a global store to the {@link Processor
Processors},
+ * {@link Transformer Transformers}, or {@link ValueTransformer
ValueTransformer}; those have read-only access to all global stores by default.
+ *
+ * @param storeBuilder user defined {@link StoreBuilder}; can't
be {@code null}
+ * @param topic the topic to source the data from
+ * @param consumed the instance of {@link Consumed} used to
define optional parameters; can't be {@code null}
+ * @param stateUpdateSupplier the instance of {@link ProcessorSupplier}
+ * @param reprocessOnRestore restore by reprocessing the data using a
processor supplied by stateUpdateSupplier or loads the data in byte for byte
Review Comment:
nit:
```suggestion
* @param reprocessOnRestore restore by reprocessing the data using a
processor supplied by stateUpdateSupplier or load the data in byte for byte
```
Wondering if it would make it clearer to say "if true, restore ...,
otherwise load .."
##########
streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java:
##########
@@ -613,6 +616,74 @@ public synchronized <KIn, VIn> StreamsBuilder
addGlobalStore(final StoreBuilder<
return this;
}
+ /**
+ * Adds a global {@link StateStore} to the topology.
+ * The {@link StateStore} sources its data from all partitions of the
provided input topic.
+ * There will be exactly one instance of this {@link StateStore} per Kafka
Streams instance.
+ * <p>
+ * A {@link SourceNode} with the provided sourceName will be added to
consume the data arriving from the partitions
+ * of the input topic.
+ * <p>
+ * The provided {@link ProcessorSupplier} will be used to create an
Review Comment:
nit:
```suggestion
* The provided {@link ProcessorSupplier} will be used to create a
```
##########
streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java:
##########
@@ -613,6 +616,74 @@ public synchronized <KIn, VIn> StreamsBuilder
addGlobalStore(final StoreBuilder<
return this;
}
+ /**
+ * Adds a global {@link StateStore} to the topology.
+ * The {@link StateStore} sources its data from all partitions of the
provided input topic.
+ * There will be exactly one instance of this {@link StateStore} per Kafka
Streams instance.
+ * <p>
+ * A {@link SourceNode} with the provided sourceName will be added to
consume the data arriving from the partitions
+ * of the input topic.
+ * <p>
+ * The provided {@link ProcessorSupplier} will be used to create an
+ * {@link Processor} that will receive all records forwarded from the
{@link SourceNode}.
+ * The supplier should always generate a new instance. Creating a single
{@link Processor} object
+ * and returning the same object reference in {@link
ProcessorSupplier#get()} is a
+ * violation of the supplier pattern and leads to runtime exceptions.
+ * This {@link Processor} should be used to keep the {@link StateStore}
up-to-date.
+ * The default {@link TimestampExtractor} as specified in the {@link
StreamsConfig config} is used.
+ * <p>
+ * It is not required to connect a global store to the {@link Processor
Processors},
+ * {@link Transformer Transformers}, or {@link ValueTransformer
ValueTransformer}; those have read-only access to all global stores by default.
+ *
+ * @param storeBuilder user defined {@link StoreBuilder}; can't
be {@code null}
+ * @param topic the topic to source the data from
+ * @param consumed the instance of {@link Consumed} used to
define optional parameters; can't be {@code null}
+ * @param stateUpdateSupplier the instance of {@link ProcessorSupplier}
+ * @param reprocessOnRestore restore by reprocessing the data using a
processor supplied by stateUpdateSupplier or loads the data in byte for byte
+ * @return itself
+ * @throws TopologyException if the processor of state is already
registered
+ */
+ public synchronized <KIn, VIn> StreamsBuilder addGlobalStore(final
StoreBuilder<?> storeBuilder,
+ final String
topic,
+ final
Consumed<KIn, VIn> consumed,
+ final
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier,
Review Comment:
I saw that adding a ProcessorSupplier having Void, Void as output KV types
doesn't allow to benefit from bulk writing into the state store, as it's the
processor which writes into it. Is there a plan of adding APIs with
transformers instead (namely ProcessorSupplier<KIn, VIn, KOut, VOut>) ? I
think that would allow bulk writes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]