This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e937664969a KAFKA-20523: Fix logger class name in 
KStreamKTableJoinProcessor (#22132)
e937664969a is described below

commit e937664969a00ffc4800ee3d73bfd79619ac9ad7
Author: Gavin Wang <[email protected]>
AuthorDate: Thu May 7 19:17:33 2026 -0400

    KAFKA-20523: Fix logger class name in KStreamKTableJoinProcessor (#22132)
    
    Fix logger class name in `KStreamKTableJoinProcessor` — it was
    initialized with `KStreamKTableJoin.class` (the `ProcessorSupplier`)
    instead of its own class, so warnings like `Skipping record due to null
    join key or value` were logged under the wrong logger name and were
    effectively invisible when grepping for `KStreamKTableJoinProcessor`.
    
    One-line change: pass `KStreamKTableJoinProcessor.class` to
    `LoggerFactory.getLogger`.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java   | 2 +-
 .../apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java | 4 ++--
 .../kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java    | 4 ++--
 3 files changed, 5 insertions(+), 5 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
index cf7d17f0d77..decbe065211 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -45,7 +45,7 @@ import static 
org.apache.kafka.streams.state.ValueTimestampHeaders.getValueOrNul
 class KStreamKTableJoinProcessor<StreamKey, StreamValue, TableKey, TableValue, 
VOut>
     extends ContextualProcessor<StreamKey, StreamValue, StreamKey, VOut> {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKTableJoin.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKTableJoinProcessor.class);
 
     private final KTableValueGetter<TableKey, TableValue> valueGetter;
     private final KeyValueMapper<? super StreamKey, ? super StreamValue, ? 
extends TableKey> keyMapper;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 9e913daa55c..285f81f4cc6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -453,7 +453,7 @@ public class KStreamKTableJoinTest {
     @ValueSource(booleans = {false, true})
     public void shouldLogAndMeterWhenSkippingNullLeftKey(final boolean 
withHeaders) {
         setUp(withHeaders);
-        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(KStreamKTableJoin.class)) {
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(KStreamKTableJoinProcessor.class)) {
             final TestInputTopic<Integer, String> inputTopic =
                 driver.createInputTopic(streamTopic, new IntegerSerializer(), 
new StringSerializer());
             inputTopic.pipeInput(null, "A");
@@ -484,7 +484,7 @@ public class KStreamKTableJoinTest {
     @ValueSource(booleans = {false, true})
     public void shouldLogAndMeterWhenSkippingNullLeftValue(final boolean 
withHeaders) {
         setUp(withHeaders);
-        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(KStreamKTableJoin.class)) {
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(KStreamKTableJoinProcessor.class)) {
             final TestInputTopic<Integer, String> inputTopic =
                 driver.createInputTopic(streamTopic, new IntegerSerializer(), 
new StringSerializer());
             inputTopic.pipeInput(1, null);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index bcb6a54d941..707e2914bc5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -221,7 +221,7 @@ public class KStreamKTableLeftJoinTest {
         pushToTable(1, "Y");
         processor.checkAndClearProcessResult(EMPTY);
 
-        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(KStreamKTableJoin.class)) {
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(KStreamKTableJoinProcessor.class)) {
             final TestInputTopic<Integer, String> inputTopic =
                 driver.createInputTopic(streamTopic, new IntegerSerializer(), 
new StringSerializer());
             inputTopic.pipeInput(null, "A", 0);
@@ -251,7 +251,7 @@ public class KStreamKTableLeftJoinTest {
     @ValueSource(booleans = {false, true})
     public void shouldLogAndMeterWhenSkippingNullLeftValue(final boolean 
withHeaders) {
         setUp(withHeaders);
-        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(KStreamKTableJoin.class)) {
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(KStreamKTableJoinProcessor.class)) {
             final TestInputTopic<Integer, String> inputTopic =
                 driver.createInputTopic(streamTopic, new IntegerSerializer(), 
new StringSerializer());
             inputTopic.pipeInput(1, null);

Reply via email to