chia7712 commented on code in PR #16396:
URL: https://github.com/apache/kafka/pull/16396#discussion_r1650082583
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##########
@@ -180,9 +169,10 @@ private void setupStore() {
when(store.name()).thenReturn(storeName);
}
- @Test
- public void shouldNotRegisterSameStoreMultipleTimes() {
- setupStateManagerMock();
+ @ParameterizedTest
+ @EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE,
names = {"ACTIVE", "STANDBY"})
Review Comment:
`mode = EnumSource.Mode.INCLUDE` is redundant, since that is the default
value
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##########
@@ -199,9 +189,10 @@ public void shouldNotRegisterStoreWithoutMetadata() {
() -> changelogReader.register(new
TopicPartition("ChangelogWithoutStoreMetadata", 0), stateManager));
}
- @Test
- public void shouldSupportUnregisterChangelogBeforeInitialization() {
- setupStateManagerMock();
+ @ParameterizedTest
+ @EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE,
names = {"ACTIVE", "STANDBY"})
Review Comment:
`mode = EnumSource.Mode.INCLUDE` is redundant, since that is the default
value
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##########
@@ -280,9 +272,10 @@ public void
shouldSupportUnregisterChangelogBeforeCompletion() {
assertNull(callback.storeNameCalledStates.get(RESTORE_BATCH));
}
- @Test
- public void shouldSupportUnregisterChangelogAfterCompletion() {
- setupStateManagerMock();
+ @ParameterizedTest
+ @EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE,
names = {"ACTIVE", "STANDBY"})
Review Comment:
`mode = EnumSource.Mode.INCLUDE` is redundant, since that is the default
value
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##########
@@ -549,9 +548,10 @@ public void
shouldRestoreFromPositionAndCheckForCompletion() {
}
}
- @Test
- public void shouldRestoreFromBeginningAndCheckCompletion() {
- setupStateManagerMock();
+ @ParameterizedTest
+ @EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE,
names = {"ACTIVE", "STANDBY"})
Review Comment:
`mode = EnumSource.Mode.INCLUDE` is redundant, since that is the default
value
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java:
##########
@@ -380,17 +372,31 @@ public void setUp() {
}
}
- @Parameterized.Parameters(name = "rackAwareStrategy={0}")
- public static Collection<Object[]> getParamStoreType() {
- return asList(new Object[][] {
- {StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE},
- {StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC},
- {StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY},
- });
+// @Parameterized.Parameters(name = "rackAwareStrategy={0}")
Review Comment:
please remove those code
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##########
@@ -338,9 +331,10 @@ public void
shouldSupportUnregisterChangelogAfterCompletion() {
}
}
- @Test
- public void shouldInitializeChangelogAndCheckForCompletion() {
- setupStateManagerMock();
+ @ParameterizedTest
+ @EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE,
names = {"ACTIVE", "STANDBY"})
Review Comment:
`mode = EnumSource.Mode.INCLUDE` is redundant, since that is the default
value
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java:
##########
@@ -71,23 +73,27 @@ public class KeyValueStoreMaterializerTest {
@Mock
private StreamsConfig streamsConfig;
- @Before
Review Comment:
So we can keep using `@BeforeEach` now, right?
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##########
@@ -228,9 +219,10 @@ public void
shouldSupportUnregisterChangelogBeforeInitialization() {
assertNull(standbyListener.capturedStore(UPDATE_BATCH));
}
- @Test
- public void shouldSupportUnregisterChangelogBeforeCompletion() {
- setupStateManagerMock();
+ @ParameterizedTest
+ @EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE,
names = {"ACTIVE", "STANDBY"})
Review Comment:
`mode = EnumSource.Mode.INCLUDE` is redundant, since that is the default
value
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##########
@@ -380,11 +374,12 @@ public void
shouldInitializeChangelogAndCheckForCompletion() {
}
}
- @Test
- public void
shouldTriggerRestoreListenerWithOffsetZeroIfPositionThrowsTimeoutException() {
+ @ParameterizedTest
+ @EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE,
names = {"ACTIVE", "STANDBY"})
Review Comment:
`mode = EnumSource.Mode.INCLUDE` is redundant, since that is the default
value
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##########
@@ -412,38 +407,41 @@ public long position(final TopicPartition partition) {
}
}
- @Test
- public void shouldPollWithRightTimeoutWithStateUpdater() {
- setupStateManagerMock();
+ @ParameterizedTest
+ @EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE,
names = {"ACTIVE", "STANDBY"})
Review Comment:
`mode = EnumSource.Mode.INCLUDE` is redundant, since that is the default
value
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##########
@@ -476,9 +474,10 @@ private void shouldPollWithRightTimeout(final Properties
properties) {
}
}
- @Test
- public void shouldRestoreFromPositionAndCheckForCompletion() {
- setupStateManagerMock();
+ @ParameterizedTest
+ @EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE,
names = {"ACTIVE", "STANDBY"})
Review Comment:
`mode = EnumSource.Mode.INCLUDE` is redundant, since that is the default
value
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##########
@@ -828,9 +828,10 @@ public ListOffsetsResult listOffsets(final
Map<TopicPartition, OffsetSpec> topic
assertEquals(kaboom, thrown.getCause());
}
- @Test
- public void shouldRequestCommittedOffsetsAndHandleTimeoutException() {
- setupStateManagerMock();
+ @ParameterizedTest
+ @EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE,
names = {"ACTIVE", "STANDBY"})
Review Comment:
`mode = EnumSource.Mode.INCLUDE` is redundant, since that is the default
value
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##########
@@ -412,38 +407,41 @@ public long position(final TopicPartition partition) {
}
}
- @Test
- public void shouldPollWithRightTimeoutWithStateUpdater() {
- setupStateManagerMock();
+ @ParameterizedTest
+ @EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE,
names = {"ACTIVE", "STANDBY"})
+ public void shouldPollWithRightTimeoutWithStateUpdater(final Task.TaskType
type) {
+ setupStateManagerMock(type);
setupStoreMetadata();
setupStore();
- shouldPollWithRightTimeout(true);
+ shouldPollWithRightTimeout(true, type);
}
- @Test
- public void shouldPollWithRightTimeoutWithoutStateUpdater() {
- setupStateManagerMock();
+ @ParameterizedTest
+ @EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE,
names = {"ACTIVE", "STANDBY"})
Review Comment:
`mode = EnumSource.Mode.INCLUDE` is redundant, since that is the default
value
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##########
@@ -412,38 +407,41 @@ public long position(final TopicPartition partition) {
}
}
- @Test
- public void shouldPollWithRightTimeoutWithStateUpdater() {
- setupStateManagerMock();
+ @ParameterizedTest
+ @EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE,
names = {"ACTIVE", "STANDBY"})
+ public void shouldPollWithRightTimeoutWithStateUpdater(final Task.TaskType
type) {
+ setupStateManagerMock(type);
setupStoreMetadata();
setupStore();
- shouldPollWithRightTimeout(true);
+ shouldPollWithRightTimeout(true, type);
}
- @Test
- public void shouldPollWithRightTimeoutWithoutStateUpdater() {
- setupStateManagerMock();
+ @ParameterizedTest
+ @EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE,
names = {"ACTIVE", "STANDBY"})
+ public void shouldPollWithRightTimeoutWithoutStateUpdater(final
Task.TaskType type) {
+ setupStateManagerMock(type);
setupStoreMetadata();
setupStore();
- shouldPollWithRightTimeout(false);
+ shouldPollWithRightTimeout(false, type);
}
- private void shouldPollWithRightTimeout(final boolean stateUpdaterEnabled)
{
+ private void shouldPollWithRightTimeout(final boolean stateUpdaterEnabled,
final Task.TaskType type) {
final Properties properties = new Properties();
properties.put(InternalConfig.STATE_UPDATER_ENABLED,
stateUpdaterEnabled);
- shouldPollWithRightTimeout(properties);
+ shouldPollWithRightTimeout(properties, type);
}
- @Test
- public void shouldPollWithRightTimeoutWithStateUpdaterDefault() {
- setupStateManagerMock();
+ @ParameterizedTest
+ @EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE,
names = {"ACTIVE", "STANDBY"})
Review Comment:
`mode = EnumSource.Mode.INCLUDE` is redundant, since that is the default
value
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]