mjsax commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1520627804
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java:
##########
@@ -25,36 +25,46 @@
public final class StreamStreamJoinUtil {
- private StreamStreamJoinUtil(){
+ private StreamStreamJoinUtil() {
}
public static <KIn, VIn, KOut, VOut> boolean skipRecord(
final Record<KIn, VIn> record, final Logger logger,
final Sensor droppedRecordsSensor,
- final ProcessorContext<KOut, VOut> context) {
+ final ProcessorContext<KOut, VOut> context
+ ) {
// we do join iff keys are equal, thus, if key is null we cannot join
and just ignore the record
//
// we also ignore the record if value is null, because in a key-value
data model a null-value indicates
// an empty message (ie, there is nothing to be joined) -- this
contrast SQL NULL semantics
// furthermore, on left/outer joins 'null' in ValueJoiner#apply()
indicates a missing record --
// thus, to be consistent and to avoid ambiguous null semantics, null
values are ignored
if (record.key() == null || record.value() == null) {
- if (context.recordMetadata().isPresent()) {
- final RecordMetadata recordMetadata =
context.recordMetadata().get();
- logger.warn(
- "Skipping record due to null key or value. "
- + "topic=[{}] partition=[{}] offset=[{}]",
- recordMetadata.topic(), recordMetadata.partition(),
recordMetadata.offset()
- );
- } else {
- logger.warn(
- "Skipping record due to null key or value. Topic,
partition, and offset not known."
- );
- }
- droppedRecordsSensor.record();
+ logSkip("null key or value", logger, droppedRecordsSensor,
context);
return true;
} else {
return false;
}
}
+
+ public static <KOut, VOut> void logSkip(
+ final String reason,
+ final Logger logger,
+ final Sensor droppedRecordsSensor,
+ final ProcessorContext<KOut, VOut> context
+ ) {
+ if (context.recordMetadata().isPresent()) {
+ final RecordMetadata recordMetadata =
context.recordMetadata().get();
+ logger.warn(
+ "Skipping record. reason=[{}] topic=[{}] partition=[{}]
offset=[{}]",
Review Comment:
```suggestion
"Skipping record. Reason=[{}] topic=[{}] partition=[{}]
offset=[{}]",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]