iit2009060 commented on code in PR #21535:
URL: https://github.com/apache/kafka/pull/21535#discussion_r2893970279
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java:
##########
@@ -254,23 +252,29 @@ public void process(final Record<KIn, VIn> record) {
final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords
= response.deadLetterQueueRecords();
if (!deadLetterQueueRecords.isEmpty()) {
- final RecordCollector collector = ((RecordCollector.Supplier)
internalProcessorContext).recordCollector();
- for (final ProducerRecord<byte[], byte[]>
deadLetterQueueRecord : deadLetterQueueRecords) {
- collector.send(
- deadLetterQueueRecord.key(),
- deadLetterQueueRecord.value(),
- name(),
- internalProcessorContext,
- deadLetterQueueRecord
- );
+ if (!(internalProcessorContext instanceof
RecordCollector.Supplier)) {
+ log.warn("Dead letter queue records cannot be sent for
GlobalKTable processors " +
Review Comment:
done
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java:
##########
@@ -254,23 +252,29 @@ public void process(final Record<KIn, VIn> record) {
final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords
= response.deadLetterQueueRecords();
if (!deadLetterQueueRecords.isEmpty()) {
- final RecordCollector collector = ((RecordCollector.Supplier)
internalProcessorContext).recordCollector();
- for (final ProducerRecord<byte[], byte[]>
deadLetterQueueRecord : deadLetterQueueRecords) {
- collector.send(
- deadLetterQueueRecord.key(),
- deadLetterQueueRecord.value(),
- name(),
- internalProcessorContext,
- deadLetterQueueRecord
- );
+ if (!(internalProcessorContext instanceof
RecordCollector.Supplier)) {
+ log.warn("Dead letter queue records cannot be sent for
GlobalKTable processors " +
+ "(no producer available). DLQ support for
GlobalKTable will be addressed in a future KIP. " + "Record context: {}",
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]