pegasas commented on code in PR #16791:
URL: https://github.com/apache/kafka/pull/16791#discussion_r1730390328
##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -820,110 +778,6 @@ public synchronized <KIn, VIn> Topology
addReadOnlyStateStore(final StoreBuilder
);
}
- /**
- * Adds a global {@link StateStore} to the topology.
- * The {@link StateStore} sources its data from all partitions of the
provided input topic.
- * There will be exactly one instance of this {@link StateStore} per Kafka
Streams instance.
- * <p>
- * A {@link SourceNode} with the provided sourceName will be added to
consume the data arriving from the partitions
- * of the input topic.
- * <p>
- * The provided {@link
org.apache.kafka.streams.processor.ProcessorSupplier} will be used to create an
{@link ProcessorNode} that will receive all
- * records forwarded from the {@link SourceNode}.
- * The supplier should always generate a new instance each time
- * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} gets
called. Creating a single
- * {@link org.apache.kafka.streams.processor.Processor} object and
returning the same object reference in
- * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()}
would be a violation of the supplier pattern
- * and leads to runtime exceptions.
- * This {@link ProcessorNode} should be used to keep the {@link
StateStore} up-to-date.
- * The default {@link TimestampExtractor} as specified in the {@link
StreamsConfig config} is used.
- *
- * @param storeBuilder user defined state store builder
- * @param sourceName name of the {@link SourceNode} that will
be automatically added
- * @param keyDeserializer the {@link Deserializer} to deserialize
keys with
- * @param valueDeserializer the {@link Deserializer} to deserialize
values with
- * @param topic the topic to source the data from
- * @param processorName the name of the {@link
org.apache.kafka.streams.processor.ProcessorSupplier}
- * @param stateUpdateSupplier the instance of {@link
org.apache.kafka.streams.processor.ProcessorSupplier}
- * @return itself
- * @throws TopologyException if the processor of state is already
registered
- * @deprecated Since 2.7.0. Use {@link #addGlobalStore(StoreBuilder,
String, Deserializer, Deserializer, String, String, ProcessorSupplier)} instead.
- */
- @Deprecated
- public synchronized <K, V> Topology addGlobalStore(final StoreBuilder<?>
storeBuilder,
- final String sourceName,
- final Deserializer<K>
keyDeserializer,
- final Deserializer<V>
valueDeserializer,
- final String topic,
- final String
processorName,
- final
org.apache.kafka.streams.processor.ProcessorSupplier<K, V> stateUpdateSupplier)
{
- internalTopologyBuilder.addGlobalStore(
- new StoreBuilderWrapper(storeBuilder),
- sourceName,
- null,
- keyDeserializer,
- valueDeserializer,
- topic,
- processorName,
- () -> ProcessorAdapter.adapt(stateUpdateSupplier.get()),
- true
- );
- return this;
- }
-
- /**
- * Adds a global {@link StateStore} to the topology.
- * The {@link StateStore} sources its data from all partitions of the
provided input topic.
- * There will be exactly one instance of this {@link StateStore} per Kafka
Streams instance.
- * <p>
- * A {@link SourceNode} with the provided sourceName will be added to
consume the data arriving from the partitions
- * of the input topic.
- * <p>
- * The provided {@link
org.apache.kafka.streams.processor.ProcessorSupplier} will be used to create an
{@link ProcessorNode} that will receive all
- * records forwarded from the {@link SourceNode}.
- * The supplier should always generate a new instance each time
- * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} gets
called. Creating a single
- * {@link org.apache.kafka.streams.processor.Processor} object and
returning the same object reference in
- * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()}
would be a violation of the supplier pattern
- * and leads to runtime exceptions.
- * This {@link ProcessorNode} should be used to keep the {@link
StateStore} up-to-date.
- *
- * @param storeBuilder user defined key value store builder
- * @param sourceName name of the {@link SourceNode} that will
be automatically added
- * @param timestampExtractor the stateless timestamp extractor used for
this source,
- * if not specified the default extractor
defined in the configs will be used
- * @param keyDeserializer the {@link Deserializer} to deserialize
keys with
- * @param valueDeserializer the {@link Deserializer} to deserialize
values with
- * @param topic the topic to source the data from
- * @param processorName the name of the {@link
org.apache.kafka.streams.processor.ProcessorSupplier}
- * @param stateUpdateSupplier the instance of {@link
org.apache.kafka.streams.processor.ProcessorSupplier}
- * @return itself
- * @throws TopologyException if the processor of state is already
registered
- * @deprecated Since 2.7.0. Use {@link #addGlobalStore(StoreBuilder,
String, TimestampExtractor, Deserializer, Deserializer, String, String,
ProcessorSupplier)} instead.
- */
- @Deprecated
- public synchronized <K, V> Topology addGlobalStore(final StoreBuilder<?>
storeBuilder,
- final String sourceName,
- final
TimestampExtractor timestampExtractor,
- final Deserializer<K>
keyDeserializer,
- final Deserializer<V>
valueDeserializer,
- final String topic,
- final String
processorName,
- final
org.apache.kafka.streams.processor.ProcessorSupplier<K, V> stateUpdateSupplier)
{
- internalTopologyBuilder.addGlobalStore(
- new StoreBuilderWrapper(storeBuilder),
- sourceName,
- timestampExtractor,
- keyDeserializer,
- valueDeserializer,
- topic,
- processorName,
- () -> ProcessorAdapter.adapt(stateUpdateSupplier.get()),
Review Comment:
I agree with that. it seems a lot of interfaces has example/code reference.
we should divide and conquer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]