mjsax commented on code in PR #17190:
URL: https://github.com/apache/kafka/pull/17190#discussion_r1779732067


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java:
##########
@@ -312,79 +312,6 @@ public void 
testDrivingSimpleTopologyWithDroppingPartitioner() {
         assertTrue(outputTopic1.isEmpty());
     }
 
-    @Test
-    public void testDrivingStatefulTopology() {
-        final String storeName = "entries";
-        driver = new TopologyTestDriver(createStatefulTopology(storeName), 
props);
-        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
-        final TestOutputTopic<Integer, String> outputTopic1 =
-                driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
-
-        inputTopic.pipeInput("key1", "value1");
-        inputTopic.pipeInput("key2", "value2");
-        inputTopic.pipeInput("key3", "value3");
-        inputTopic.pipeInput("key1", "value4");
-        assertTrue(outputTopic1.isEmpty());
-
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore(storeName);
-        assertEquals("value4", store.get("key1"));
-        assertEquals("value2", store.get("key2"));
-        assertEquals("value3", store.get("key3"));
-        assertNull(store.get("key4"));
-    }
-
-    @Test
-    public void testDrivingConnectedStateStoreTopology() {
-        driver = new 
TopologyTestDriver(createConnectedStateStoreTopology("connectedStore"), props);
-        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
-        final TestOutputTopic<Integer, String> outputTopic1 =
-            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
-
-        inputTopic.pipeInput("key1", "value1");
-        inputTopic.pipeInput("key2", "value2");
-        inputTopic.pipeInput("key3", "value3");
-        inputTopic.pipeInput("key1", "value4");
-        assertTrue(outputTopic1.isEmpty());
-
-        final KeyValueStore<String, String> store = 
driver.getKeyValueStore("connectedStore");
-        assertEquals("value4", store.get("key1"));
-        assertEquals("value2", store.get("key2"));
-        assertEquals("value3", store.get("key3"));
-        assertNull(store.get("key4"));
-    }
-
-    @Deprecated // testing old PAPI
-    @Test
-    public void 
testDrivingConnectedStateStoreInDifferentProcessorsTopologyWithOldAPI() {
-        final String storeName = "connectedStore";
-        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String());
-        topology
-            .addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addSource("source2", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_2)
-            .addProcessor("processor1", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source1")

Review Comment:
   `OldAPIStatefulProcessor` should be unused now and can be removed, too?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java:
##########
@@ -1569,24 +1064,6 @@ private Topology 
createSimpleTopologyWithDroppingPartitioner() {
                 .addSink("sink", OUTPUT_TOPIC_1, new DroppingPartitioner(), 
"processor");
     }
 
-    @Deprecated // testing old PAPI
-    private Topology createStatefulTopology(final String storeName) {
-        return topology
-            .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor", define(new 
OldAPIStatefulProcessor(storeName)), "source")
-            
.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
 Serdes.String(), Serdes.String()), "processor")
-            .addSink("counts", OUTPUT_TOPIC_1, "processor");
-    }
-
-    @Deprecated // testing old PAPI
-    private Topology createConnectedStateStoreTopology(final String storeName) 
{
-        final StoreBuilder<KeyValueStore<String, String>> storeBuilder = 
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String());
-        return topology
-            .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor", defineWithStoresOldAPI(() -> new 
OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), 
"source")

Review Comment:
   `defineWithStoresOldAPI()` should be unused now and can be removed, too?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java:
##########
@@ -1569,24 +1064,6 @@ private Topology 
createSimpleTopologyWithDroppingPartitioner() {
                 .addSink("sink", OUTPUT_TOPIC_1, new DroppingPartitioner(), 
"processor");
     }
 
-    @Deprecated // testing old PAPI
-    private Topology createStatefulTopology(final String storeName) {
-        return topology
-            .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
-            .addProcessor("processor", define(new 
OldAPIStatefulProcessor(storeName)), "source")

Review Comment:
   `define()` should be unused now and can be removed, too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to