[
https://issues.apache.org/jira/browse/KAFKA-19554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18010332#comment-18010332
]
Federico Valeri commented on KAFKA-19554:
-----------------------------------------
Hello, this requires a KIP, are you going to create one?
> Add a Kafka client parameter to limit number of messages fetched
> ----------------------------------------------------------------
>
> Key: KAFKA-19554
> URL: https://issues.apache.org/jira/browse/KAFKA-19554
> Project: Kafka
> Issue Type: Improvement
> Reporter: corkitse
> Priority: Major
>
> h3. Description
> Currently, Kafka fetch requests only support limiting the total size of
> messages fetched ({{{}fetch.max.bytes{}}}) and the size per partition
> ({{{}max.partition.fetch.bytes{}}}). However, there is no way to limit the
> *number of messages* fetched per request—neither globally nor on a
> per-partition basis.
> While Kafka was originally designed as a high-throughput distributed
> messaging platform and has traditionally focused more on throughput than
> individual message control, its role has since evolved. Kafka is now not only
> a leading message queue but also a core component in modern {*}data pipelines
> and stream processing frameworks{*}.
> In these newer use cases, especially for downstream services and streaming
> applications, *rate-limiting by message count* is a common requirement.
> Currently, the workaround is for clients to {*}fetch a batch of messages,
> manually truncate them based on count, and then adjust offsets manually{*},
> which is inefficient, error-prone, and significantly reduces throughput. In
> practice, this forces developers to use external tools such as Redis to
> implement additional buffering or rate control mechanisms—adding complexity
> and overhead.
> Adding *native support* for a message count limit in fetch requests would
> offer the following benefits:
> h3. Benefits
> # {*}Make Kafka a more mature and production-ready stream processing
> platform{*}, by supporting more granular rate-limiting use cases.
> # *Improve overall system throughput* for consumers that need to limit by
> message count, by eliminating inefficient post-processing workarounds.
> ----
> h3. Potential Challenges
> # Due to compression and batching, Kafka consumers do not always have direct
> access to message counts in a fetch response. This means any solution would
> need to {*}estimate or calculate message counts indirectly{*}—possibly based
> on batch metadata.
> # Implementation must ensure that {*}Kafka’s high-performance
> characteristics are preserved{*}. Any support for message-count limits must
> avoid excessive decompression or deserialization on the broker side.
> Moreover, from what I’ve observed, {*}many capable companies have already
> implemented their own internal forks or wrappers of Kafka to support this
> feature{*}, highlighting the demand and practical importance of this
> functionality. Therefore, it would be highly beneficial for Kafka to provide
> a {*}unified and officially supported solution{*}.
>
> The parameter reference from our internal version.
>
>
>
> {code:java}
> ...
> "validVersions": "4-18",
> "flexibleVersions": "12+",
> "fields": [
> { "name": "ClusterId", "type": "string", "versions": "12+",
> "nullableVersions": "12+", "default": "null",
> "taggedVersions": "12+", "tag": 0, "ignorable": true,
> "about": "The clusterId if known. This is used to validate metadata
> fetches prior to broker registration." },
> { "name": "ReplicaId", "type": "int32", "versions": "0-14", "default":
> "-1", "entityType": "brokerId",
> "about": "The broker ID of the follower, of -1 if this request is from a
> consumer." },
> { "name": "ReplicaState", "type": "ReplicaState", "versions": "15+",
> "taggedVersions": "15+", "tag": 1,
> "about": "The state of the replica in the follower.", "fields": [
> { "name": "ReplicaId", "type": "int32", "versions": "15+", "default":
> "-1", "entityType": "brokerId",
> "about": "The replica ID of the follower, or -1 if this request is from
> a consumer." },
> { "name": "ReplicaEpoch", "type": "int64", "versions": "15+", "default":
> "-1",
> "about": "The epoch of this follower, or -1 if not available." }
> ]},
> { "name": "MaxWaitMs", "type": "int32", "versions": "0+",
> "about": "The maximum time in milliseconds to wait for the response." },
> { "name": "MinBytes", "type": "int32", "versions": "0+",
> "about": "The minimum bytes to accumulate in the response." },
> { "name": "MaxBytes", "type": "int32", "versions": "3+", "default":
> "0x7fffffff", "ignorable": true,
> "about": "The maximum bytes to fetch. See KIP-74 for cases where this
> limit may not be honored." },
> { "name": "MaxNum", "type": "int32", "versions": "18+", "default": "-1",
> "ignorable": true,
> "about": "The maximum number of messages to fetch. -1 means no limit." },
> { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default":
> "0", "ignorable": true,
> "about": "This setting controls the visibility of transactional records.
> Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With
> READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED
> transactional records are visible. To be more concrete, READ_COMMITTED
> returns all data from offsets smaller than the current LSO (last stable
> offset), and enables the inclusion of the list of aborted transactions in the
> result, which allows consumers to discard ABORTED transactional records." },
> { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0",
> "ignorable": true,
> "about": "The fetch session ID." },
> { "name": "SessionEpoch", "type": "int32", "versions": "7+", "default":
> "-1", "ignorable": true,
> "about": "The fetch session epoch, which is used for ordering requests in
> a session." },
> { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
> "about": "The topics to fetch.", "fields": [
> { "name": "Topic", "type": "string", "versions": "0-12", "entityType":
> "topicName", "ignorable": true,
> "about": "The name of the topic to fetch." },
> { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable":
> true, "about": "The unique topic ID."},
> { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
> "about": "The partitions to fetch.", "fields": [
> { "name": "Partition", "type": "int32", "versions": "0+",
> "about": "The partition index." },
> { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "9+",
> "default": "-1", "ignorable": true,
> "about": "The current leader epoch of the partition." },
> { "name": "FetchOffset", "type": "int64", "versions": "0+",
> "about": "The message offset." },
> { "name": "LastFetchedEpoch", "type": "int32", "versions": "12+",
> "default": "-1", "ignorable": false,
> "about": "The epoch of the last fetched record or -1 if there is
> none."},
> { "name": "LogStartOffset", "type": "int64", "versions": "5+",
> "default": "-1", "ignorable": true,
> "about": "The earliest available offset of the follower replica. The
> field is only used when the request is sent by the follower."},
> { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
> "about": "The maximum bytes to fetch from this partition. See KIP-74
> for cases where this limit may not be honored." },
> { "name": "PartitionMaxNum", "type": "int32", "versions": "18+",
> "default": "-1", "ignorable": true,
> "about": "The maximum number of messages to fetch from this
> partition. -1 means no limit." },
> { "name": "ReplicaDirectoryId", "type": "uuid", "versions": "17+",
> "taggedVersions": "17+", "tag": 0, "ignorable": true,
> "about": "The directory id of the follower fetching." }
> ]}
> ]},
> ...{code}
>
> ----
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)