[
https://issues.apache.org/jira/browse/KAFKA-10438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chia-Ping Tsai resolved KAFKA-10438.
------------------------------------
Fix Version/s: 2.8.0
Resolution: Fixed
> Lazy initialization of record header to reduce memory usage in validating
> records
> ---------------------------------------------------------------------------------
>
> Key: KAFKA-10438
> URL: https://issues.apache.org/jira/browse/KAFKA-10438
> Project: Kafka
> Issue Type: Improvement
> Reporter: Chia-Ping Tsai
> Assignee: Chia-Ping Tsai
> Priority: Minor
> Fix For: 2.8.0
>
>
> {code}
> private def validateRecord(batch: RecordBatch, topicPartition:
> TopicPartition, record: Record, batchIndex: Int, now: Long,
> timestampType: TimestampType,
> timestampDiffMaxMs: Long, compactedTopic: Boolean,
> brokerTopicStats: BrokerTopicStats):
> Option[ApiRecordError] = {
> if (!record.hasMagic(batch.magic)) {
> brokerTopicStats.allTopicsStats.invalidMagicNumberRecordsPerSec.mark()
> return Some(ApiRecordError(Errors.INVALID_RECORD, new
> RecordError(batchIndex,
> s"Record $record's magic does not match outer magic ${batch.magic} in
> topic partition $topicPartition.")))
> }
> // verify the record-level CRC only if this is one of the deep entries of
> a compressed message
> // set for magic v0 and v1. For non-compressed messages, there is no
> inner record for magic v0 and v1,
> // so we depend on the batch-level CRC check in
> Log.analyzeAndValidateRecords(). For magic v2 and above,
> // there is no record-level CRC to check.
> if (batch.magic <= RecordBatch.MAGIC_VALUE_V1 && batch.isCompressed) {
> try {
> record.ensureValid()
> } catch {
> case e: InvalidRecordException =>
>
> brokerTopicStats.allTopicsStats.invalidMessageCrcRecordsPerSec.mark()
> throw new CorruptRecordException(e.getMessage + s" in topic
> partition $topicPartition.")
> }
> }
> validateKey(record, batchIndex, topicPartition, compactedTopic,
> brokerTopicStats).orElse {
> validateTimestamp(batch, record, batchIndex, now, timestampType,
> timestampDiffMaxMs)
> }
> }
> {code}
> There is no checks for header key so instantiating key (bytes to string) is
> unnecessary.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)