zheguang commented on code in PR #20353:
URL: https://github.com/apache/kafka/pull/20353#discussion_r2375406856
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java:
##########
@@ -112,28 +208,59 @@ public void after() {
@Test
public void shouldDelegateInit() {
- final InternalMockProcessorContext context = mockContext();
- final KeyValueStore<Bytes, byte[]> innerMock =
mock(InMemoryKeyValueStore.class);
+ final InternalMockProcessorContext mockContext = mockContext();
final StateStore outer = new
ChangeLoggingKeyValueBytesStore(innerMock);
- outer.init(context, outer);
- verify(innerMock).init(context, outer);
+ outer.init(mockContext, outer);
+ verify(innerMock).init(mockContext, outer);
+ }
+
+ @Test
+ public void shouldWriteKeyValuePairBytesToInnerStoreOnPut() {
+ final Map<Bytes, byte[]> mockMap = new HashMap<>();
+ mockPut(mockMap);
+ mockGet(mockMap);
+ mockPosition();
+
+ store.put(hi, there);
Review Comment:
This `store.put` will eventually call `collector.send` to send the changelog
record. But the same `collector` instance is reused across all tests (line
79), so records from previous tests may remain in the `collector` and
potentially cause test assertion on state to fail. If my understanding is
correct, we'd want to instantiate a new `collector` or reset it for each test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]