This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e937664969a KAFKA-20523: Fix logger class name in
KStreamKTableJoinProcessor (#22132)
e937664969a is described below
commit e937664969a00ffc4800ee3d73bfd79619ac9ad7
Author: Gavin Wang <[email protected]>
AuthorDate: Thu May 7 19:17:33 2026 -0400
KAFKA-20523: Fix logger class name in KStreamKTableJoinProcessor (#22132)
Fix logger class name in `KStreamKTableJoinProcessor` — it was
initialized with `KStreamKTableJoin.class` (the `ProcessorSupplier`)
instead of its own class, so warnings like `Skipping record due to null
join key or value` were logged under the wrong logger name and were
effectively invisible when grepping for `KStreamKTableJoinProcessor`.
One-line change: pass `KStreamKTableJoinProcessor.class` to
`LoggerFactory.getLogger`.
Reviewers: Matthias J. Sax <[email protected]>
---
.../kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java | 2 +-
.../apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java | 4 ++--
.../kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java | 4 ++--
3 files changed, 5 insertions(+), 5 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
index cf7d17f0d77..decbe065211 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -45,7 +45,7 @@ import static
org.apache.kafka.streams.state.ValueTimestampHeaders.getValueOrNul
class KStreamKTableJoinProcessor<StreamKey, StreamValue, TableKey, TableValue,
VOut>
extends ContextualProcessor<StreamKey, StreamValue, StreamKey, VOut> {
- private static final Logger LOG =
LoggerFactory.getLogger(KStreamKTableJoin.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(KStreamKTableJoinProcessor.class);
private final KTableValueGetter<TableKey, TableValue> valueGetter;
private final KeyValueMapper<? super StreamKey, ? super StreamValue, ?
extends TableKey> keyMapper;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 9e913daa55c..285f81f4cc6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -453,7 +453,7 @@ public class KStreamKTableJoinTest {
@ValueSource(booleans = {false, true})
public void shouldLogAndMeterWhenSkippingNullLeftKey(final boolean
withHeaders) {
setUp(withHeaders);
- try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(KStreamKTableJoin.class)) {
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(KStreamKTableJoinProcessor.class)) {
final TestInputTopic<Integer, String> inputTopic =
driver.createInputTopic(streamTopic, new IntegerSerializer(),
new StringSerializer());
inputTopic.pipeInput(null, "A");
@@ -484,7 +484,7 @@ public class KStreamKTableJoinTest {
@ValueSource(booleans = {false, true})
public void shouldLogAndMeterWhenSkippingNullLeftValue(final boolean
withHeaders) {
setUp(withHeaders);
- try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(KStreamKTableJoin.class)) {
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(KStreamKTableJoinProcessor.class)) {
final TestInputTopic<Integer, String> inputTopic =
driver.createInputTopic(streamTopic, new IntegerSerializer(),
new StringSerializer());
inputTopic.pipeInput(1, null);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index bcb6a54d941..707e2914bc5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -221,7 +221,7 @@ public class KStreamKTableLeftJoinTest {
pushToTable(1, "Y");
processor.checkAndClearProcessResult(EMPTY);
- try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(KStreamKTableJoin.class)) {
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(KStreamKTableJoinProcessor.class)) {
final TestInputTopic<Integer, String> inputTopic =
driver.createInputTopic(streamTopic, new IntegerSerializer(),
new StringSerializer());
inputTopic.pipeInput(null, "A", 0);
@@ -251,7 +251,7 @@ public class KStreamKTableLeftJoinTest {
@ValueSource(booleans = {false, true})
public void shouldLogAndMeterWhenSkippingNullLeftValue(final boolean
withHeaders) {
setUp(withHeaders);
- try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(KStreamKTableJoin.class)) {
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(KStreamKTableJoinProcessor.class)) {
final TestInputTopic<Integer, String> inputTopic =
driver.createInputTopic(streamTopic, new IntegerSerializer(),
new StringSerializer());
inputTopic.pipeInput(1, null);