junrao commented on code in PR #20138:
URL: https://github.com/apache/kafka/pull/20138#discussion_r2198336561
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -884,6 +897,8 @@ private void sendProduceRequest(long now, int destination,
short acks, int timeo
.setIndex(tp.partition())
.setRecords(records));
recordsByPartition.put(tp, batch);
+
+ sensors.updateInconsistentTopics(batch, bufferSupplier);
Review Comment:
Could we pass in `tp` and `records` instead of batch to avoid any potential
visibility issue on batch?
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -1054,6 +1077,54 @@ public void updateProduceRequestMetrics(Map<Integer,
List<ProducerBatch>> batche
}
}
+ /**
+ * Iterate over all the {@link Record}s in the batch and checks the
{@link Header}s for
+ * {@link #TOPIC_NAME_HEADER_NAME}. If the header is present, check
its topic name against the
+ * {@link ProducerBatch#topicPartition}'s topic name. If it doesn't
match, increment the metrics to
+ * track the inconsistency.
+ */
+ public void updateInconsistentTopics(ProducerBatch producerBatch,
BufferSupplier bufferSupplier) {
+ for (RecordBatch recordBatch : producerBatch.records().batches()) {
+ if (recordBatch.isControlBatch())
+ continue;
+
+ try (CloseableIterator<Record> recordIterator =
recordBatch.streamingIterator(bufferSupplier)) {
+ while (recordIterator.hasNext()) {
+ Record record = recordIterator.next();
+ Header[] headers = record.headers();
+
+ if (headers == null)
+ continue;
+
+ for (Header header : headers) {
+ if (!header.key().equals(TOPIC_NAME_HEADER_NAME))
+ continue;
+
+ String headerTopicName = new
String(header.value(), StandardCharsets.UTF_8);
+
+ if
(headerTopicName.equals(producerBatch.topicPartition.topic()))
+ continue;
+
+ log.warn(
+ "A topic mismatch was detected! ProducerBatch
topic: {}, Record header {} topic: {}",
Review Comment:
Record header {} topic: {} => Record header {}: {} since the header already
contains topic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]