bbejeck commented on code in PR #18778:
URL: https://github.com/apache/kafka/pull/18778#discussion_r1940199139


##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -662,87 +418,30 @@ public synchronized Topology addSink(final String name,
     }
 
     /**
-     * Add a new sink that forwards records from upstream parent processor 
and/or source nodes to the named Kafka topic,
-     * using the supplied partitioner.
-     * The sink will use the {@link 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
-     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value 
serializer} specified in the
-     * {@link StreamsConfig stream configuration}.
-     * <p>
-     * The sink will also use the specified {@link StreamPartitioner} to 
determine how records are distributed among
-     * the named Kafka topic's partitions.
-     * Such control is often useful with topologies that use {@link 
#addStateStore(StoreBuilder, String...) state
-     * stores} in its processors.
-     * In most other cases, however, a partitioner needs not be specified and 
Kafka will automatically distribute
-     * records among partitions using Kafka's default partitioning logic.
-     *
-     * @param name the unique name of the sink
-     * @param topic the name of the Kafka topic to which this sink should 
write its records
-     * @param partitioner the function that should be used to determine the 
partition for each record processed by the sink
-     * @param parentNames the name of one or more source or processor nodes 
whose output records this sink should consume
-     * and write to its topic
-     * @return itself
-     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name
-     * @see #addSink(String, String, String...)
-     * @see #addSink(String, String, Serializer, Serializer, String...)
-     * @see #addSink(String, String, Serializer, Serializer, 
StreamPartitioner, String...)
+     * See {@link #addSink(String, String, String...)}.
      */
-    public synchronized <K, V> Topology addSink(final String name,
-                                                final String topic,
-                                                final StreamPartitioner<? 
super K, ? super V> partitioner,
-                                                final String... parentNames) {
+    public synchronized Topology addSink(final String name,
+                                         final String topic,
+                                         final StreamPartitioner<?, ?> 
partitioner,
+                                         final String... parentNames) {
         internalTopologyBuilder.addSink(name, topic, null, null, partitioner, 
parentNames);
         return this;
     }
 
     /**
-     * Add a new sink that forwards records from upstream parent processor 
and/or source nodes to the named Kafka topic.
-     * The sink will use the specified key and value serializers.
-     *
-     * @param name the unique name of the sink
-     * @param topic the name of the Kafka topic to which this sink should 
write its records
-     * @param keySerializer the {@link Serializer key serializer} used when 
consuming records; may be null if the sink
-     * should use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG 
default key serializer} specified in the
-     * {@link StreamsConfig stream configuration}
-     * @param valueSerializer the {@link Serializer value serializer} used 
when consuming records; may be null if the sink
-     * should use the {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG 
default value serializer} specified in the
-     * {@link StreamsConfig stream configuration}
-     * @param parentNames the name of one or more source or processor nodes 
whose output records this sink should consume
-     * and write to its topic
-     * @return itself
-     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name
-     * @see #addSink(String, String, String...)
-     * @see #addSink(String, String, StreamPartitioner, String...)
-     * @see #addSink(String, String, Serializer, Serializer, 
StreamPartitioner, String...)
+     * See {@link #addSink(String, String, String...)}.
      */
-    public synchronized <K, V> Topology addSink(final String name,
-                                                final String topic,
-                                                final Serializer<K> 
keySerializer,
-                                                final Serializer<V> 
valueSerializer,
-                                                final String... parentNames) {
+    public synchronized Topology addSink(final String name,
+                                         final String topic,
+                                         final Serializer<?> keySerializer,
+                                         final Serializer<?> valueSerializer,

Review Comment:
   I'm not sure, I'd have to play around with the code some, but I think it 
would have to with the generics on the `ProcessorSupplier` interface 
declaration.



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -918,38 +630,96 @@ public synchronized Topology addStateStore(final 
StoreBuilder<?> storeBuilder,
     }
 
     /**
-     * Adds a read-only {@link StateStore} to the topology.
-     * <p>
-     * A read-only {@link StateStore} does not create a dedicated changelog 
topic but uses it's input topic as
-     * changelog; thus, the used topic should be configured with log 
compaction.
-     * <p>
-     * The <code>auto.offset.reset</code> property will be set to 
<code>earliest</code> for this topic.
-     * <p>
-     * The provided {@link ProcessorSupplier} will be used to create a 
processor for all messages received
-     * from the given topic. This processor should contain logic to keep the 
{@link StateStore} up-to-date.
-     *
-     * @param storeBuilder          user defined store builder
-     * @param sourceName            name of the {@link SourceNode} that will 
be automatically added
-     * @param timestampExtractor    the stateless timestamp extractor used for 
this source,
-     *                              if not specified the default extractor 
defined in the configs will be used
-     * @param keyDeserializer       the {@link Deserializer} to deserialize 
keys with
-     * @param valueDeserializer     the {@link Deserializer} to deserialize 
values with
-     * @param topic                 the topic to source the data from
-     * @param processorName         the name of the {@link ProcessorSupplier}
-     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
-     * @return itself
-     * @throws TopologyException if the processor of state is already 
registered
-     */
-    public synchronized <KIn, VIn> Topology addReadOnlyStateStore(final 
StoreBuilder<?> storeBuilder,
-                                                                  final String 
sourceName,
-                                                                  final 
TimestampExtractor timestampExtractor,
-                                                                  final 
Deserializer<KIn> keyDeserializer,
-                                                                  final 
Deserializer<VIn> valueDeserializer,
-                                                                  final String 
topic,
-                                                                  final String 
processorName,
-                                                                  final 
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
-        storeBuilder.withLoggingDisabled();
+     * Adds a read-only {@link StateStore state store} to the topology.
+     * The state store will be populated with data from the named source topic.
+     * State stores are sharded and the number of shards is determined at 
runtime by the number of input topic
+     * partitions for the source topic <em>and</em> the connected processors 
(if any).
+     * Read-only state stores can be accessed from "outside" using 
"Interactive Queries" (cf.,
+     * {@link KafkaStreams#store(StoreQueryParameters)} and {@link 
KafkaStreams#query(StateQueryRequest)}).
+     *
+     * <p>The {@code auto.offset.reset} property will be set to {@code 
"earliest"} for the source topic.
+     * If you want to specify a source specific {@link TimestampExtractor} you 
can use
+     * {@link #addReadOnlyStateStore(StoreBuilder, String, TimestampExtractor, 
Deserializer, Deserializer, String, String, ProcessorSupplier)}.
+     *
+     * <p>{@link #connectProcessorAndStateStores(String, String...) 
Connecting} a read-only state store to
+     * {@link #addProcessor(String, ProcessorSupplier, String...) processors} 
is optional.
+     * If not connected to any processor, the state store will still be 
created and can be queried via
+     * {@link KafkaStreams#store(StoreQueryParameters)} or {@link 
KafkaStreams#query(StateQueryRequest)}.
+     * If the state store is connected to another processor, each 
corresponding {@link Processor} instance in the
+     * topology has <em>read-only</em> access to a single shard of the state 
store.
+     * If you need write access to a state store, you can use a
+     * {@link #addStateStore(StoreBuilder, String...) "regular" state store} 
instead.
+     * If you need access to all data in a state store inside a {@link 
Processor}, you can use a (read-only)
+     * {@link #addGlobalStore(StoreBuilder, String, Deserializer, 
Deserializer, String, String, ProcessorSupplier)
+     * global state store}.
+     *
+     * <p>The provided {@link ProcessorSupplier} will be used to create {@link 
Processor} instances which will be used
+     * to process the records from the source topic.
+     * These {@link Processor processors} are the only ones with 
<em>write</em> access to the state store,
+     * and should contain logic to keep the {@link StateStore} up-to-date.
+     *
+     * <p>Read-only state stores are always enabled for fault-tolerance and 
recovery.
+     * In contrast to {@link #addStateStore(StoreBuilder, String...) "regular" 
state stores} no dedicated changelog
+     * topic will be created in Kafka though, but the source topic is used for 
recovery.
+     * Thus, the source topic should be configured with log compaction.
+     *
+     * @param storeBuilder
+     *        the {@link StoreBuilder} used to obtain {@link StateStore state 
store} instances (one per shard)
+     * @param sourceName
+     *        the unique name of the internally added {@link 
#addSource(String, String...) source}
+     * @param keyDeserializer
+     *        the {@link Deserializer} for record keys
+     *        (can be {@code null} to use the default key deserializer from 
{@link StreamsConfig})
+     * @param valueDeserializer
+     *        the {@link Deserializer} for record values
+     *        (can be {@code null} to use the default value deserializer from 
{@link StreamsConfig})
+     * @param topic
+     *        the source topic to read the data from
+     * @param processorName
+     *        the unique name of the internally added
+     *        {@link #addProcessor(String, ProcessorSupplier, String...) 
processor} which maintains the state store
+     * @param stateUpdateSupplier
+     *        the supplier used to obtain {@link Processor} instances, which 
maintain the state store
+     *
+     * @return itself
+     *
+     * @throws TopologyException
+     *         if the {@link StoreBuilder#name() state store} was already 
added, or
+     *         if the source or processor names are not unique, or
+     *         if the source topic has already been registered by another
+     *         {@link #addSink(String, String, String...) source}, read-only 
state store, or
+     *         {@link #addGlobalStore(StoreBuilder, String, Deserializer, 
Deserializer, String, String, ProcessorSupplier) global state store}
+     */
+    public synchronized <K, V> Topology addReadOnlyStateStore(final 
StoreBuilder<?> storeBuilder,
+                                                              final String 
sourceName,
+                                                              final 
Deserializer<K> keyDeserializer,
+                                                              final 
Deserializer<V> valueDeserializer,
+                                                              final String 
topic,
+                                                              final String 
processorName,
+                                                              final 
ProcessorSupplier<K, V, Void, Void> stateUpdateSupplier) {
+        return addReadOnlyStateStore(
+            storeBuilder,
+            sourceName,
+            null,
+            keyDeserializer,
+            valueDeserializer,
+            topic,
+            processorName,
+            stateUpdateSupplier
+        );
+    }
 
+    /**
+     * See {@link #addReadOnlyStateStore(StoreBuilder, String, Deserializer, 
Deserializer, String, String, ProcessorSupplier)}.
+     */
+    public synchronized <K, V> Topology addReadOnlyStateStore(final 
StoreBuilder<?> storeBuilder,
+                                                              final String 
sourceName,
+                                                              final 
TimestampExtractor timestampExtractor,
+                                                              final 
Deserializer<K> keyDeserializer,
+                                                              final 
Deserializer<V> valueDeserializer,

Review Comment:
   +1 for keeping the generic types over `<?>` for consistency



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to